diff --git a/build.gradle b/build.gradle index 7d23717f93..8b682050bb 100644 --- a/build.gradle +++ b/build.gradle @@ -184,6 +184,7 @@ project(":samza-core_$scalaVersion") { compile "org.eclipse.jetty:jetty-webapp:$jettyVersion" compile "org.scala-lang:scala-library:$scalaLibVersion" compile "org.slf4j:slf4j-api:$slf4jVersion" + compile "net.jodah:failsafe:$failsafeVersion" testCompile project(":samza-api").sourceSets.test.output testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-core:$mockitoVersion" diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 3c4c3a9303..b33ab82e9a 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -45,4 +45,5 @@ yarnVersion = "2.6.1" zkClientVersion = "0.8" zookeeperVersion = "3.4.6" + failsafeVersion = "1.1.0" } diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java index 88bc7df33f..9ef4c1baa2 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java @@ -43,11 +43,11 @@ * @param the type of the value in this table */ public class RemoteReadWriteTable extends RemoteReadableTable implements ReadWriteTable { - private final TableWriteFunction writeFn; private DefaultTableWriteMetrics writeMetrics; @VisibleForTesting + final TableWriteFunction writeFn; final TableRateLimiter writeRateLimiter; public RemoteReadWriteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn, diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java index 3186fee94a..b3d82f3463 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java @@ -80,10 +80,10 @@ public class RemoteReadableTable implements ReadableTable { protected final ExecutorService callbackExecutor; protected final ExecutorService tableExecutor; - private final TableReadFunction readFn; private DefaultTableReadMetrics readMetrics; @VisibleForTesting + final TableReadFunction readFn; final TableRateLimiter readRateLimiter; /** diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java index a8d419d1f3..537ff878f7 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java @@ -24,6 +24,7 @@ import org.apache.samza.operators.BaseTableDescriptor; import org.apache.samza.table.TableSpec; +import org.apache.samza.table.retry.TableRetryPolicy; import org.apache.samza.table.utils.SerdeUtils; import org.apache.samza.util.EmbeddedTaggedRateLimiter; import org.apache.samza.util.RateLimiter; @@ -70,6 +71,9 @@ public class RemoteTableDescriptor extends BaseTableDescriptor readCreditFn; private TableRateLimiter.CreditFunction writeCreditFn; + private TableRetryPolicy readRetryPolicy; + private TableRetryPolicy writeRetryPolicy; + // By default execute future callbacks on the native client threads // ie. no additional thread pool for callbacks. private int asyncCallbackPoolSize = -1; @@ -115,13 +119,23 @@ public TableSpec getTableSpec() { "write credit function", writeCreditFn)); } + if (readRetryPolicy != null) { + tableSpecConfig.put(RemoteTableProvider.READ_RETRY_POLICY, SerdeUtils.serialize( + "read retry policy", readRetryPolicy)); + } + + if (writeRetryPolicy != null) { + tableSpecConfig.put(RemoteTableProvider.WRITE_RETRY_POLICY, SerdeUtils.serialize( + "write retry policy", writeRetryPolicy)); + } + tableSpecConfig.put(RemoteTableProvider.ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize)); return new TableSpec(tableId, serde, RemoteTableProviderFactory.class.getName(), tableSpecConfig); } /** - * Use specified TableReadFunction with remote table. + * Use specified TableReadFunction with remote table and a retry policy. * @param readFn read function instance * @return this table descriptor instance */ @@ -132,7 +146,7 @@ public RemoteTableDescriptor withReadFunction(TableReadFunction read } /** - * Use specified TableWriteFunction with remote table. + * Use specified TableWriteFunction with remote table and a retry policy. * @param writeFn write function instance * @return this table descriptor instance */ @@ -142,6 +156,34 @@ public RemoteTableDescriptor withWriteFunction(TableWriteFunction wr return this; } + /** + * Use specified TableReadFunction with remote table. + * @param readFn read function instance + * @param retryPolicy retry policy for the read function + * @return this table descriptor instance + */ + public RemoteTableDescriptor withReadFunction(TableReadFunction readFn, TableRetryPolicy retryPolicy) { + Preconditions.checkNotNull(readFn, "null read function"); + Preconditions.checkNotNull(retryPolicy, "null retry policy"); + this.readFn = readFn; + this.readRetryPolicy = retryPolicy; + return this; + } + + /** + * Use specified TableWriteFunction with remote table. + * @param writeFn write function instance + * @param retryPolicy retry policy for the write function + * @return this table descriptor instance + */ + public RemoteTableDescriptor withWriteFunction(TableWriteFunction writeFn, TableRetryPolicy retryPolicy) { + Preconditions.checkNotNull(writeFn, "null write function"); + Preconditions.checkNotNull(retryPolicy, "null retry policy"); + this.writeFn = writeFn; + this.writeRetryPolicy = retryPolicy; + return this; + } + /** * Specify a rate limiter along with credit functions to map a table record (as KV) to the amount * of credits to be charged from the rate limiter for table read and write operations. diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java index 6c5d9b38ed..cae0bbdef1 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java @@ -25,11 +25,16 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.apache.samza.table.Table; import org.apache.samza.table.TableSpec; +import org.apache.samza.table.retry.RetriableReadFunction; +import org.apache.samza.table.retry.RetriableWriteFunction; +import org.apache.samza.table.retry.TableRetryPolicy; import org.apache.samza.table.utils.BaseTableProvider; import org.apache.samza.table.utils.SerdeUtils; +import org.apache.samza.table.utils.TableMetricsUtil; import org.apache.samza.util.RateLimiter; import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG; @@ -47,6 +52,8 @@ public class RemoteTableProvider extends BaseTableProvider { static final String READ_CREDIT_FN = "io.read.credit.func"; static final String WRITE_CREDIT_FN = "io.write.credit.func"; static final String ASYNC_CALLBACK_POOL_SIZE = "io.async.callback.pool.size"; + static final String READ_RETRY_POLICY = "io.read.retry.policy"; + static final String WRITE_RETRY_POLICY = "io.write.retry.policy"; private final boolean readOnly; private final List> tables = new ArrayList<>(); @@ -58,6 +65,7 @@ public class RemoteTableProvider extends BaseTableProvider { */ private static Map tableExecutors = new ConcurrentHashMap<>(); private static Map callbackExecutors = new ConcurrentHashMap<>(); + private static ScheduledExecutorService retryExecutor; public RemoteTableProvider(TableSpec tableSpec) { super(tableSpec); @@ -72,7 +80,7 @@ public Table getTable() { RemoteReadableTable table; String tableId = tableSpec.getId(); - TableReadFunction readFn = getReadFn(); + TableReadFunction readFn = getReadFn(); RateLimiter rateLimiter = deserializeObject(RATE_LIMITER); if (rateLimiter != null) { rateLimiter.init(containerContext.config, taskContext); @@ -83,11 +91,33 @@ public Table getTable() { TableRateLimiter.CreditFunction writeCreditFn; TableRateLimiter writeRateLimiter = null; + TableRetryPolicy readRetryPolicy = deserializeObject(READ_RETRY_POLICY); + TableRetryPolicy writeRetryPolicy = null; + + if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor == null) { + retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> { + Thread thread = new Thread(runnable); + thread.setName("table-retry-executor"); + thread.setDaemon(true); + return thread; + }); + } + + if (readRetryPolicy != null) { + readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor); + } + + TableWriteFunction writeFn = getWriteFn(); + boolean isRateLimited = readRateLimiter.isRateLimited(); if (!readOnly) { writeCreditFn = deserializeObject(WRITE_CREDIT_FN); writeRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, writeCreditFn, RL_WRITE_TAG); isRateLimited |= writeRateLimiter.isRateLimited(); + writeRetryPolicy = deserializeObject(WRITE_RETRY_POLICY); + if (writeRetryPolicy != null) { + writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor); + } } // Optional executor for future callback/completion. Shared by both read and write operations. @@ -116,10 +146,18 @@ public Table getTable() { table = new RemoteReadableTable(tableSpec.getId(), readFn, readRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId)); } else { - table = new RemoteReadWriteTable(tableSpec.getId(), readFn, getWriteFn(), readRateLimiter, + table = new RemoteReadWriteTable(tableSpec.getId(), readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId)); } + TableMetricsUtil metricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId); + if (readRetryPolicy != null) { + ((RetriableReadFunction) readFn).setMetrics(metricsUtil); + } + if (writeRetryPolicy != null) { + ((RetriableWriteFunction) writeFn).setMetrics(metricsUtil); + } + table.init(containerContext, taskContext); tables.add(table); return table; diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java index 5d0f96319e..4791779415 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java @@ -100,5 +100,12 @@ default CompletableFuture> getAllAsync(Collection keys) { .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().join()))); } + /** + * Determine whether the current operation can be retried with the last thrown exception. + * @param exception exception thrown by a table operation + * @return whether the operation can be retried + */ + boolean isRetriable(Throwable exception); + // optionally implement readObject() to initialize transient states } diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java index 0ac3a0c204..d9d619f888 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java @@ -142,6 +142,13 @@ default CompletableFuture deleteAllAsync(Collection keys) { return CompletableFuture.allOf(Iterables.toArray(deleteFutures, CompletableFuture.class)); } + /** + * Determine whether the current operation can be retried with the last thrown exception. + * @param exception exception thrown by a table operation + * @return whether the operation can be retried + */ + boolean isRetriable(Throwable exception); + /** * Flush the remote store (optional) */ diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java b/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java new file mode 100644 index 0000000000..b2eccd884a --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.samza.SamzaException; + +import net.jodah.failsafe.AsyncFailsafe; +import net.jodah.failsafe.Failsafe; +import net.jodah.failsafe.RetryPolicy; + + +/** + * Helper class adapting the generic {@link TableRetryPolicy} to a failsafe {@link RetryPolicy} and + * creating failsafe retryer instances with proper metrics management. + */ +class FailsafeAdapter { + /** + * Convert the {@link TableRetryPolicy} to failsafe {@link RetryPolicy}. + * @return this policy instance + */ + static RetryPolicy valueOf(TableRetryPolicy policy) { + RetryPolicy failSafePolicy = new RetryPolicy(); + + switch (policy.getBackoffType()) { + case NONE: + break; + + case FIXED: + failSafePolicy.withDelay(policy.getSleepTime().toMillis(), TimeUnit.MILLISECONDS); + break; + + case RANDOM: + failSafePolicy.withDelay(policy.getRandomMin().toMillis(), policy.getRandomMax().toMillis(), TimeUnit.MILLISECONDS); + break; + + case EXPONENTIAL: + failSafePolicy.withBackoff(policy.getSleepTime().toMillis(), policy.getExponentialMaxSleep().toMillis(), TimeUnit.MILLISECONDS, + policy.getExponentialFactor()); + break; + + default: + throw new SamzaException("Unknown retry policy type."); + } + + if (policy.getMaxDuration() != null) { + failSafePolicy.withMaxDuration(policy.getMaxDuration().toMillis(), TimeUnit.MILLISECONDS); + } + if (policy.getMaxAttempts() != null) { + failSafePolicy.withMaxRetries(policy.getMaxAttempts()); + } + if (policy.getJitter() != null && policy.getBackoffType() != TableRetryPolicy.BackoffType.RANDOM) { + failSafePolicy.withJitter(policy.getJitter().toMillis(), TimeUnit.MILLISECONDS); + } + + failSafePolicy.retryOn(e -> policy.getRetryPredicate().test(e)); + + return failSafePolicy; + } + + /** + * Obtain an async failsafe retryer instance with the specified policy, metrics, and executor service. + * @param retryPolicy retry policy + * @param metrics retry metrics + * @param retryExec executor service for scheduling async retries + * @return {@link net.jodah.failsafe.AsyncFailsafe} instance + */ + static AsyncFailsafe failsafe(RetryPolicy retryPolicy, RetryMetrics metrics, ScheduledExecutorService retryExec) { + long startMs = System.currentTimeMillis(); + return Failsafe.with(retryPolicy).with(retryExec) + .onRetry(e -> metrics.retryCount.inc()) + .onRetriesExceeded(e -> { + metrics.retryTimer.update(System.currentTimeMillis() - startMs); + metrics.permFailureCount.inc(); + }) + .onSuccess((e, ctx) -> { + if (ctx.getExecutions() > 1) { + metrics.retryTimer.update(System.currentTimeMillis() - startMs); + } else { + metrics.successCount.inc(); + } + }); + } +} diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java new file mode 100644 index 0000000000..1adddc0f49 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Predicate; + +import org.apache.samza.SamzaException; +import org.apache.samza.table.remote.TableReadFunction; +import org.apache.samza.table.utils.TableMetricsUtil; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import net.jodah.failsafe.RetryPolicy; + +import static org.apache.samza.table.retry.FailsafeAdapter.failsafe; + + +/** + * Wrapper for a {@link TableReadFunction} instance to add common retry + * support with a {@link TableRetryPolicy}. This wrapper is created by + * {@link org.apache.samza.table.remote.RemoteTableProvider} when a retry + * policy is specified together with the {@link TableReadFunction}. + * + * Actual retry mechanism is provided by the failsafe library. Retry is + * attempted in an async way with a {@link ScheduledExecutorService}. + * + * @param the type of the key in this table + * @param the type of the value in this table + */ +public class RetriableReadFunction implements TableReadFunction { + private final RetryPolicy retryPolicy; + private final TableReadFunction readFn; + private final ScheduledExecutorService retryExecutor; + + @VisibleForTesting + RetryMetrics retryMetrics; + + public RetriableReadFunction(TableRetryPolicy policy, TableReadFunction readFn, + ScheduledExecutorService retryExecutor) { + Preconditions.checkNotNull(policy); + Preconditions.checkNotNull(readFn); + Preconditions.checkNotNull(retryExecutor); + + this.readFn = readFn; + this.retryExecutor = retryExecutor; + Predicate retryPredicate = policy.getRetryPredicate(); + policy.withRetryPredicate((ex) -> readFn.isRetriable(ex) || retryPredicate.test(ex)); + this.retryPolicy = FailsafeAdapter.valueOf(policy); + } + + @Override + public CompletableFuture getAsync(K key) { + return failsafe(retryPolicy, retryMetrics, retryExecutor) + .future(() -> readFn.getAsync(key)) + .exceptionally(e -> { + throw new SamzaException("Failed to get the record for " + key + " after retries.", e); + }); + } + + @Override + public CompletableFuture> getAllAsync(Collection keys) { + return failsafe(retryPolicy, retryMetrics, retryExecutor) + .future(() -> readFn.getAllAsync(keys)) + .exceptionally(e -> { + throw new SamzaException("Failed to get the records for " + keys + " after retries.", e); + }); + } + + @Override + public boolean isRetriable(Throwable exception) { + return readFn.isRetriable(exception); + } + + /** + * Initialize retry-related metrics + * @param metricsUtil metrics util + */ + public void setMetrics(TableMetricsUtil metricsUtil) { + this.retryMetrics = new RetryMetrics("reader", metricsUtil); + } +} diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java new file mode 100644 index 0000000000..2f3f062ff9 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Predicate; + +import org.apache.samza.SamzaException; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.table.remote.TableWriteFunction; +import org.apache.samza.table.utils.TableMetricsUtil; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import net.jodah.failsafe.RetryPolicy; + +import static org.apache.samza.table.retry.FailsafeAdapter.failsafe; + + +/** + * Wrapper for a {@link TableWriteFunction} instance to add common retry + * support with a {@link TableRetryPolicy}. This wrapper is created by + * {@link org.apache.samza.table.remote.RemoteTableProvider} when a retry + * policy is specified together with the {@link TableWriteFunction}. + * + * Actual retry mechanism is provided by the failsafe library. Retry is + * attempted in an async way with a {@link ScheduledExecutorService}. + * + * @param the type of the key in this table + * @param the type of the value in this table + */ +public class RetriableWriteFunction implements TableWriteFunction { + private final RetryPolicy retryPolicy; + private final TableWriteFunction writeFn; + private final ScheduledExecutorService retryExecutor; + + @VisibleForTesting + RetryMetrics retryMetrics; + + public RetriableWriteFunction(TableRetryPolicy policy, TableWriteFunction writeFn, + ScheduledExecutorService retryExecutor) { + Preconditions.checkNotNull(policy); + Preconditions.checkNotNull(writeFn); + Preconditions.checkNotNull(retryExecutor); + + this.writeFn = writeFn; + this.retryExecutor = retryExecutor; + Predicate retryPredicate = policy.getRetryPredicate(); + policy.withRetryPredicate((ex) -> writeFn.isRetriable(ex) || retryPredicate.test(ex)); + this.retryPolicy = FailsafeAdapter.valueOf(policy); + } + + @Override + public CompletableFuture putAsync(K key, V record) { + return failsafe(retryPolicy, retryMetrics, retryExecutor) + .future(() -> writeFn.putAsync(key, record)) + .exceptionally(e -> { + throw new SamzaException("Failed to get the record for " + key + " after retries.", e); + }); + } + + @Override + public CompletableFuture putAllAsync(Collection> records) { + return failsafe(retryPolicy, retryMetrics, retryExecutor) + .future(() -> writeFn.putAllAsync(records)) + .exceptionally(e -> { + throw new SamzaException("Failed to put records after retries.", e); + }); + } + + @Override + public CompletableFuture deleteAsync(K key) { + return failsafe(retryPolicy, retryMetrics, retryExecutor) + .future(() -> writeFn.deleteAsync(key)) + .exceptionally(e -> { + throw new SamzaException("Failed to delete the record for " + key + " after retries.", e); + }); + } + + @Override + public CompletableFuture deleteAllAsync(Collection keys) { + return failsafe(retryPolicy, retryMetrics, retryExecutor) + .future(() -> writeFn.deleteAllAsync(keys)) + .exceptionally(e -> { + throw new SamzaException("Failed to delete the records for " + keys + " after retries.", e); + }); + } + + @Override + public boolean isRetriable(Throwable exception) { + return writeFn.isRetriable(exception); + } + + /** + * Initialize retry-related metrics. + * @param metricsUtil metrics util + */ + public void setMetrics(TableMetricsUtil metricsUtil) { + this.retryMetrics = new RetryMetrics("writer", metricsUtil); + } +} diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java new file mode 100644 index 0000000000..fbc511c8b6 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.Timer; +import org.apache.samza.table.utils.TableMetricsUtil; + + +/** + * Wrapper of retry-related metrics common to both {@link RetriableReadFunction} and + * {@link RetriableWriteFunction}. + */ +class RetryMetrics { + /** + * Number of retries executed (excluding the first attempt) + */ + final Counter retryCount; + + /** + * Number of successes with only the first attempt + */ + final Counter successCount; + + /** + * Number of operations that failed permanently and exhausted all retries + */ + final Counter permFailureCount; + + /** + * Total time spent in each IO; this is updated only + * when at least one retries have been attempted. + */ + final Timer retryTimer; + + public RetryMetrics(String prefix, TableMetricsUtil metricsUtil) { + retryCount = metricsUtil.newCounter(prefix + "-retry-count"); + successCount = metricsUtil.newCounter(prefix + "-success-count"); + permFailureCount = metricsUtil.newCounter(prefix + "-perm-failure-count"); + retryTimer = metricsUtil.newTimer(prefix + "-retry-timer"); + } +} diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java b/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java new file mode 100644 index 0000000000..162eb07ed6 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import java.io.Serializable; +import java.time.Duration; +import java.util.function.Predicate; + +import com.google.common.base.Preconditions; + + +/** + * Common retry policy parameters for table IO. This serves as an abstraction on top of + * retry libraries. This common policy supports below features: + * - backoff modes: fixed, random, exponential + * - termination modes: by attempts, by duration + * - jitter + * + * Retry libraries can implement a subset or all features as described by this common policy. + */ +public class TableRetryPolicy implements Serializable { + enum BackoffType { + /** + * No backoff in between two retry attempts. + */ + NONE, + + /** + * Backoff by a fixed duration {@code sleepTime}. + */ + FIXED, + + /** + * Backoff by a randomly selected duration between {@code minSleep} and {@code maxSleep}. + */ + RANDOM, + + /** + * Backoff by exponentially increasing durations by {@code exponentialFactor} starting from {@code sleepTime}. + */ + EXPONENTIAL + } + + // Backoff parameters + private Duration sleepTime; + private Duration randomMin; + private Duration randomMax; + private double exponentialFactor; + private Duration exponentialMaxSleep; + private Duration jitter; + + // By default no early termination + private Integer maxAttempts = null; + private Duration maxDuration = null; + + // By default no backoff during retries + private BackoffType backoffType = BackoffType.NONE; + + /** + * Serializable adapter interface for {@link java.util.function.Predicate}. + * This is needed because TableRetryPolicy needs to be serializable as part of the + * table config whereas {@link java.util.function.Predicate} is not serializable. + */ + public interface RetryPredicate extends Predicate, Serializable { + } + + // By default no custom retry predicate so retry decision is made solely by the table functions + private RetryPredicate retryPredicate = (ex) -> false; + + /** + * Set the sleepTime time for the fixed backoff policy. + * @param sleepTime sleepTime time + * @return this policy instance + */ + public TableRetryPolicy withFixedBackoff(Duration sleepTime) { + Preconditions.checkNotNull(sleepTime); + this.sleepTime = sleepTime; + this.backoffType = BackoffType.FIXED; + return this; + } + + /** + * Set the sleepTime time for the random backoff policy. The actual sleepTime time + * before each attempt is randomly selected between {@code [minSleep, maxSleep]} + * @param minSleep lower bound sleepTime time + * @param maxSleep upper bound sleepTime time + * @return this policy instance + */ + public TableRetryPolicy withRandomBackoff(Duration minSleep, Duration maxSleep) { + Preconditions.checkNotNull(minSleep); + Preconditions.checkNotNull(maxSleep); + this.randomMin = minSleep; + this.randomMax = maxSleep; + this.backoffType = BackoffType.RANDOM; + return this; + } + + /** + * Set the parameters for the exponential backoff policy. The actual sleepTime time + * is exponentially incremented up to the {@code maxSleep} and multiplying + * successive delays by the {@code factor}. + * @param sleepTime initial sleepTime time + * @param maxSleep upper bound sleepTime time + * @param factor exponential factor for backoff + * @return this policy instance + */ + public TableRetryPolicy withExponentialBackoff(Duration sleepTime, Duration maxSleep, double factor) { + Preconditions.checkNotNull(sleepTime); + Preconditions.checkNotNull(maxSleep); + this.sleepTime = sleepTime; + this.exponentialMaxSleep = maxSleep; + this.exponentialFactor = factor; + this.backoffType = BackoffType.EXPONENTIAL; + return this; + } + + /** + * Set the jitter for the backoff policy to provide additional randomness. + * If this is set, a random value between {@code [0, jitter]} will be added + * to each sleepTime time. This applies to {@code FIXED} and {@code EXPONENTIAL} + * modes only. + * @param jitter initial sleepTime time + * @return this policy instance + */ + public TableRetryPolicy withJitter(Duration jitter) { + Preconditions.checkNotNull(jitter); + if (backoffType != BackoffType.RANDOM) { + this.jitter = jitter; + } + return this; + } + + /** + * Set maximum number of attempts before terminating the operation. + * @param maxAttempts number of attempts + * @return this policy instance + */ + public TableRetryPolicy withStopAfterAttempts(int maxAttempts) { + Preconditions.checkArgument(maxAttempts >= 0); + this.maxAttempts = maxAttempts; + return this; + } + + /** + * Set maximum total delay (sleepTime + execution) before terminating the operation. + * @param maxDelay delay time + * @return this policy instance + */ + public TableRetryPolicy withStopAfterDelay(Duration maxDelay) { + Preconditions.checkNotNull(maxDelay); + this.maxDuration = maxDelay; + return this; + } + + /** + * Set the predicate to use for identifying retriable exceptions. If specified, table + * retry logic will consult both such predicate and table function and retry will be + * attempted if either option returns true. + * @param retryPredicate predicate for retriable exception identification + * @return this policy instance + */ + public TableRetryPolicy withRetryPredicate(RetryPredicate retryPredicate) { + Preconditions.checkNotNull(retryPredicate); + this.retryPredicate = retryPredicate; + return this; + } + + /** + * @return initial/fixed sleep time. + */ + public Duration getSleepTime() { + return sleepTime; + } + + /** + * @return lower sleepTime time for random backoff or null if {@code policyType} is not {@code RANDOM}. + */ + public Duration getRandomMin() { + return randomMin; + } + + /** + * @return upper sleepTime time for random backoff or null if {@code policyType} is not {@code RANDOM}. + */ + public Duration getRandomMax() { + return randomMax; + } + + /** + * @return exponential factor for exponential backoff. + */ + public double getExponentialFactor() { + return exponentialFactor; + } + + /** + * @return maximum sleepTime time for exponential backoff or null if {@code policyType} is not {@code EXPONENTIAL}. + */ + public Duration getExponentialMaxSleep() { + return exponentialMaxSleep; + } + + /** + * Introduce randomness to the sleepTime time. + * @return jitter to add on to each backoff or null if not set. + */ + public Duration getJitter() { + return jitter; + } + + /** + * Termination after a fix number of attempts. + * @return maximum number of attempts without success before giving up the operation or null if not set. + */ + public Integer getMaxAttempts() { + return maxAttempts; + } + + /** + * Termination after a fixed duration. + * @return maximum duration without success before giving up the operation or null if not set. + */ + public Duration getMaxDuration() { + return maxDuration; + } + + /** + * @return type of the backoff. + */ + public BackoffType getBackoffType() { + return backoffType; + } + + /** + * @return Custom predicate for retriable exception identification or null if not specified. + */ + public RetryPredicate getRetryPredicate() { + return retryPredicate; + } +} diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java index 21fc6a5752..3e844c32b8 100644 --- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java +++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.metrics.Counter; @@ -34,6 +35,9 @@ import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; import org.apache.samza.storage.kv.Entry; +import org.apache.samza.table.retry.RetriableReadFunction; +import org.apache.samza.table.retry.RetriableWriteFunction; +import org.apache.samza.table.retry.TableRetryPolicy; import org.apache.samza.task.TaskContext; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -43,6 +47,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyCollection; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -50,6 +55,18 @@ public class TestRemoteTable { + private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor(); + + public static TaskContext getMockTaskContext() { + MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); + doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString()); + doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString()); + doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any()); + TaskContext taskContext = mock(TaskContext.class); + doReturn(metricsRegistry).when(taskContext).getMetricsRegistry(); + return taskContext; + } + private > T getTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn) { return getTable(tableId, readFn, writeFn, null); @@ -72,12 +89,7 @@ private > T getTable(String tableId, table = new RemoteReadWriteTable(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor); } - TaskContext taskContext = mock(TaskContext.class); - MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); - doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString()); - doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString()); - doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), any()); - doReturn(metricsRegistry).when(taskContext).getMetricsRegistry(); + TaskContext taskContext = getMockTaskContext(); SamzaContainerContext containerContext = mock(SamzaContainerContext.class); @@ -86,35 +98,53 @@ private > T getTable(String tableId, return (T) table; } - private void doTestGet(boolean sync, boolean error) throws Exception { + private void doTestGet(boolean sync, boolean error, boolean retry) throws Exception { + String tableId = "testGet-" + sync + error + retry; TableReadFunction readFn = mock(TableReadFunction.class); // Sync is backed by async so needs to mock the async method CompletableFuture future; if (error) { future = new CompletableFuture(); future.completeExceptionally(new RuntimeException("Test exception")); + if (!retry) { + doReturn(future).when(readFn).getAsync(anyString()); + } else { + final int [] times = new int[] {0}; + doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar")) + .when(readFn).getAsync(anyString()); + } } else { future = CompletableFuture.completedFuture("bar"); + doReturn(future).when(readFn).getAsync(anyString()); } - doReturn(future).when(readFn).getAsync(anyString()); - RemoteReadableTable table = getTable("testGet-" + sync + error, readFn, null); + if (retry) { + doReturn(true).when(readFn).isRetriable(any()); + TableRetryPolicy policy = new TableRetryPolicy(); + readFn = new RetriableReadFunction<>(policy, readFn, schedExec); + } + RemoteReadableTable table = getTable(tableId, readFn, null); Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get()); verify(table.readRateLimiter, times(1)).throttle(anyString()); } @Test public void testGet() throws Exception { - doTestGet(true, false); + doTestGet(true, false, false); } @Test public void testGetAsync() throws Exception { - doTestGet(false, false); + doTestGet(false, false, false); } @Test(expected = ExecutionException.class) public void testGetAsyncError() throws Exception { - doTestGet(false, true); + doTestGet(false, true, false); + } + + @Test + public void testGetAsyncErrorRetried() throws Exception { + doTestGet(false, true, true); } @Test @@ -139,23 +169,36 @@ public void testGetMultipleTables() { }); } - private void doTestPut(boolean sync, boolean error, boolean isDelete) throws Exception { - TableWriteFunction writeFn = mock(TableWriteFunction.class); - RemoteReadWriteTable table = getTable("testPut-" + sync + error + isDelete, - mock(TableReadFunction.class), writeFn); - CompletableFuture future; - if (error) { - future = new CompletableFuture(); - future.completeExceptionally(new RuntimeException("Test exception")); + private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) throws Exception { + String tableId = "testPut-" + sync + error + isDelete + retry; + TableWriteFunction mockWriteFn = mock(TableWriteFunction.class); + TableWriteFunction writeFn = mockWriteFn; + CompletableFuture successFuture = CompletableFuture.completedFuture(null); + CompletableFuture failureFuture = new CompletableFuture(); + failureFuture.completeExceptionally(new RuntimeException("Test exception")); + if (!error) { + if (isDelete) { + doReturn(successFuture).when(writeFn).deleteAsync(any()); + } else { + doReturn(successFuture).when(writeFn).putAsync(any(), any()); + } + } else if (!retry) { + if (isDelete) { + doReturn(failureFuture).when(writeFn).deleteAsync(any()); + } else { + doReturn(failureFuture).when(writeFn).putAsync(any(), any()); + } } else { - future = CompletableFuture.completedFuture(null); - } - // Sync is backed by async so needs to mock the async method - if (isDelete) { - doReturn(future).when(writeFn).deleteAsync(any()); - } else { - doReturn(future).when(writeFn).putAsync(any(), any()); + doReturn(true).when(writeFn).isRetriable(any()); + final int [] times = new int[] {0}; + if (isDelete) { + doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any()); + } else { + doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any()); + } + writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec); } + RemoteReadWriteTable table = getTable(tableId, mock(TableReadFunction.class), writeFn); if (sync) { table.put("foo", isDelete ? null : "bar"); } else { @@ -164,9 +207,9 @@ private void doTestPut(boolean sync, boolean error, boolean isDelete) throws Exc ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(String.class); ArgumentCaptor valCaptor = ArgumentCaptor.forClass(String.class); if (isDelete) { - verify(writeFn, times(1)).deleteAsync(keyCaptor.capture()); + verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture()); } else { - verify(writeFn, times(1)).putAsync(keyCaptor.capture(), valCaptor.capture()); + verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture()); Assert.assertEquals("bar", valCaptor.getValue()); } Assert.assertEquals("foo", keyCaptor.getValue()); @@ -179,27 +222,32 @@ private void doTestPut(boolean sync, boolean error, boolean isDelete) throws Exc @Test public void testPut() throws Exception { - doTestPut(true, false, false); + doTestPut(true, false, false, false); } @Test public void testPutDelete() throws Exception { - doTestPut(true, false, true); + doTestPut(true, false, true, false); } @Test public void testPutAsync() throws Exception { - doTestPut(false, false, false); + doTestPut(false, false, false, false); } @Test public void testPutAsyncDelete() throws Exception { - doTestPut(false, false, true); + doTestPut(false, false, true, false); } @Test(expected = ExecutionException.class) public void testPutAsyncError() throws Exception { - doTestPut(false, true, false); + doTestPut(false, true, false, false); + } + + @Test + public void testPutAsyncErrorRetried() throws Exception { + doTestPut(false, true, false, true); } private void doTestDelete(boolean sync, boolean error) throws Exception { diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java index e30da12525..efe1acf229 100644 --- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java +++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java @@ -31,6 +31,9 @@ import org.apache.samza.metrics.Timer; import org.apache.samza.table.Table; import org.apache.samza.table.TableSpec; +import org.apache.samza.table.retry.RetriableReadFunction; +import org.apache.samza.table.retry.RetriableWriteFunction; +import org.apache.samza.table.retry.TableRetryPolicy; import org.apache.samza.task.TaskContext; import org.apache.samza.util.EmbeddedTaggedRateLimiter; import org.apache.samza.util.RateLimiter; @@ -138,7 +141,9 @@ public int getCredits(K key, V value) { private void doTestDeserializeReadFunctionAndLimiter(boolean rateOnly, boolean rlGets, boolean rlPuts) { int numRateLimitOps = (rlGets ? 1 : 0) + (rlPuts ? 1 : 0); RemoteTableDescriptor desc = new RemoteTableDescriptor("1"); - desc.withReadFunction(mock(TableReadFunction.class)); + TableRetryPolicy retryPolicy = new TableRetryPolicy(); + retryPolicy.withRetryPredicate((ex) -> false); + desc.withReadFunction(mock(TableReadFunction.class), retryPolicy); desc.withWriteFunction(mock(TableWriteFunction.class)); desc.withAsyncCallbackExecutorPoolSize(10); @@ -178,6 +183,9 @@ private void doTestDeserializeReadFunctionAndLimiter(boolean rateOnly, boolean r ThreadPoolExecutor callbackExecutor = (ThreadPoolExecutor) rwTable.callbackExecutor; Assert.assertEquals(10, callbackExecutor.getCorePoolSize()); + + Assert.assertNotNull(rwTable.readFn instanceof RetriableReadFunction); + Assert.assertNotNull(!(rwTable.writeFn instanceof RetriableWriteFunction)); } @Test diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java new file mode 100644 index 0000000000..9dd5a746d1 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.table.Table; +import org.apache.samza.table.remote.TableReadFunction; +import org.apache.samza.table.remote.TableWriteFunction; +import org.apache.samza.table.remote.TestRemoteTable; +import org.apache.samza.table.utils.TableMetricsUtil; +import org.apache.samza.task.TaskContext; +import org.junit.Test; + +import junit.framework.Assert; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + + +public class TestRetriableTableFunctions { + private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor(); + + public TableMetricsUtil getMetricsUtil(String tableId) { + Table table = mock(Table.class); + SamzaContainerContext cntCtx = mock(SamzaContainerContext.class); + TaskContext taskCtx = TestRemoteTable.getMockTaskContext(); + return new TableMetricsUtil(cntCtx, taskCtx, table, tableId); + } + + @Test + public void testFirstTimeSuccessGet() throws Exception { + String tableId = "testFirstTimeSuccessGet"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(100)); + TableReadFunction readFn = mock(TableReadFunction.class); + doReturn(true).when(readFn).isRetriable(any()); + doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString()); + RetriableReadFunction retryIO = new RetriableReadFunction<>(policy, readFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + Assert.assertEquals("bar", retryIO.getAsync("foo").get()); + verify(readFn, times(1)).getAsync(anyString()); + + Assert.assertEquals(0, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(1, retryIO.retryMetrics.successCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.retryTimer.getSnapshot().getMax()); + } + + @Test + public void testRetryEngagedGet() throws Exception { + String tableId = "testRetryEngagedGet"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(10)); + TableReadFunction readFn = mock(TableReadFunction.class); + doReturn(true).when(readFn).isRetriable(any()); + + int [] times = new int[] {0}; + Map map = new HashMap<>(); + map.put("foo1", "bar1"); + map.put("foo2", "bar2"); + doAnswer(invocation -> { + CompletableFuture> future = new CompletableFuture(); + if (times[0] > 0) { + future.complete(map); + } else { + times[0]++; + future.completeExceptionally(new RuntimeException("test exception")); + } + return future; + }).when(readFn).getAllAsync(any()); + + RetriableReadFunction retryIO = new RetriableReadFunction<>(policy, readFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + Assert.assertEquals(map, retryIO.getAllAsync(Arrays.asList("foo1", "foo2")).get()); + verify(readFn, times(2)).getAllAsync(any()); + + Assert.assertEquals(1, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + + @Test + public void testRetryExhaustedTimeGet() throws Exception { + String tableId = "testRetryExhaustedTime"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(5)); + policy.withStopAfterDelay(Duration.ofMillis(100)); + TableReadFunction readFn = mock(TableReadFunction.class); + doReturn(true).when(readFn).isRetriable(any()); + + CompletableFuture future = new CompletableFuture(); + future.completeExceptionally(new RuntimeException("test exception")); + doReturn(future).when(readFn).getAsync(anyString()); + + RetriableReadFunction retryIO = new RetriableReadFunction<>(policy, readFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + try { + retryIO.getAsync("foo").get(); + Assert.fail(); + } catch (ExecutionException e) { + } + + // Conservatively: must be at least 3 attempts with 5ms backoff and 100ms maxDelay + verify(readFn, atLeast(3)).getAsync(anyString()); + Assert.assertTrue(retryIO.retryMetrics.retryCount.getCount() >= 3); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + + @Test + public void testRetryExhaustedAttemptsGet() throws Exception { + String tableId = "testRetryExhaustedAttempts"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(5)); + policy.withStopAfterAttempts(10); + TableReadFunction readFn = mock(TableReadFunction.class); + doReturn(true).when(readFn).isRetriable(any()); + + CompletableFuture future = new CompletableFuture(); + future.completeExceptionally(new RuntimeException("test exception")); + doReturn(future).when(readFn).getAllAsync(any()); + + RetriableReadFunction retryIO = new RetriableReadFunction<>(policy, readFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + try { + retryIO.getAllAsync(Arrays.asList("foo1", "foo2")).get(); + Assert.fail(); + } catch (ExecutionException e) { + } + + // 1 initial try + 10 retries + verify(readFn, times(11)).getAllAsync(any()); + Assert.assertEquals(10, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + + @Test + public void testFirstTimeSuccessPut() throws Exception { + String tableId = "testFirstTimeSuccessPut"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(100)); + TableWriteFunction writeFn = mock(TableWriteFunction.class); + doReturn(true).when(writeFn).isRetriable(any()); + doReturn(CompletableFuture.completedFuture("bar")).when(writeFn).putAsync(anyString(), anyString()); + RetriableWriteFunction retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + retryIO.putAsync("foo", "bar").get(); + verify(writeFn, times(1)).putAsync(anyString(), anyString()); + + Assert.assertEquals(0, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(1, retryIO.retryMetrics.successCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.retryTimer.getSnapshot().getMax()); + } + + @Test + public void testRetryEngagedPut() throws Exception { + String tableId = "testRetryEngagedPut"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(10)); + TableWriteFunction writeFn = mock(TableWriteFunction.class); + doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAllAsync(any()); + doReturn(true).when(writeFn).isRetriable(any()); + + int [] times = new int[] {0}; + List> records = new ArrayList<>(); + records.add(new Entry<>("foo1", "bar1")); + records.add(new Entry<>("foo2", "bar2")); + doAnswer(invocation -> { + CompletableFuture> future = new CompletableFuture(); + if (times[0] > 0) { + future.complete(null); + } else { + times[0]++; + future.completeExceptionally(new RuntimeException("test exception")); + } + return future; + }).when(writeFn).putAllAsync(any()); + + RetriableWriteFunction retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + retryIO.putAllAsync(records).get(); + verify(writeFn, times(2)).putAllAsync(any()); + + Assert.assertEquals(1, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + + @Test + public void testRetryExhaustedTimePut() throws Exception { + String tableId = "testRetryExhaustedTimePut"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(5)); + policy.withStopAfterDelay(Duration.ofMillis(100)); + TableWriteFunction writeFn = mock(TableWriteFunction.class); + doReturn(true).when(writeFn).isRetriable(any()); + + CompletableFuture future = new CompletableFuture(); + future.completeExceptionally(new RuntimeException("test exception")); + doReturn(future).when(writeFn).deleteAsync(anyString()); + + RetriableWriteFunction retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + try { + retryIO.deleteAsync("foo").get(); + Assert.fail(); + } catch (ExecutionException e) { + } + + // Conservatively: must be at least 3 attempts with 5ms backoff and 100ms maxDelay + verify(writeFn, atLeast(3)).deleteAsync(anyString()); + Assert.assertTrue(retryIO.retryMetrics.retryCount.getCount() >= 3); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + + @Test + public void testRetryExhaustedAttemptsPut() throws Exception { + String tableId = "testRetryExhaustedAttemptsPut"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(5)); + policy.withStopAfterAttempts(10); + TableWriteFunction writeFn = mock(TableWriteFunction.class); + doReturn(true).when(writeFn).isRetriable(any()); + + CompletableFuture future = new CompletableFuture(); + future.completeExceptionally(new RuntimeException("test exception")); + doReturn(future).when(writeFn).deleteAllAsync(any()); + + RetriableWriteFunction retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + try { + retryIO.deleteAllAsync(Arrays.asList("foo1", "foo2")).get(); + Assert.fail(); + } catch (ExecutionException e) { + } + + // 1 initial try + 10 retries + verify(writeFn, times(11)).deleteAllAsync(any()); + Assert.assertEquals(10, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + + @Test + public void testMixedIsRetriablePredicates() throws Exception { + String tableId = "testMixedIsRetriablePredicates"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(100)); + + // Retry should be attempted based on the custom classification, ie. retry on NPE + policy.withRetryPredicate(ex -> ex instanceof NullPointerException); + + TableReadFunction readFn = mock(TableReadFunction.class); + + // Table fn classification only retries on IllegalArgumentException + doAnswer(arg -> arg.getArgumentAt(0, Throwable.class) instanceof IllegalArgumentException).when(readFn).isRetriable(any()); + + int [] times = new int[1]; + doAnswer(arg -> { + if (times[0]++ == 0) { + CompletableFuture future = new CompletableFuture(); + future.completeExceptionally(new NullPointerException("test exception")); + return future; + } else { + return CompletableFuture.completedFuture("bar"); + } + }).when(readFn).getAsync(any()); + + RetriableReadFunction retryIO = new RetriableReadFunction<>(policy, readFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + + Assert.assertEquals("bar", retryIO.getAsync("foo").get()); + + verify(readFn, times(2)).getAsync(anyString()); + Assert.assertEquals(1, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + +} diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java new file mode 100644 index 0000000000..c343d63936 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import java.time.Duration; + +import org.junit.Assert; +import org.junit.Test; + +import net.jodah.failsafe.RetryPolicy; + + +public class TestTableRetryPolicy { + @Test + public void testNoRetry() { + TableRetryPolicy retryPolicy = new TableRetryPolicy(); + Assert.assertEquals(TableRetryPolicy.BackoffType.NONE, retryPolicy.getBackoffType()); + } + + @Test + public void testFixedRetry() { + TableRetryPolicy retryPolicy = new TableRetryPolicy(); + retryPolicy.withFixedBackoff(Duration.ofMillis(1000)); + retryPolicy.withJitter(Duration.ofMillis(100)); + retryPolicy.withStopAfterAttempts(4); + Assert.assertEquals(TableRetryPolicy.BackoffType.FIXED, retryPolicy.getBackoffType()); + RetryPolicy fsRetry = FailsafeAdapter.valueOf(retryPolicy); + Assert.assertEquals(1000, fsRetry.getDelay().toMillis()); + Assert.assertEquals(100, fsRetry.getJitter().toMillis()); + Assert.assertEquals(4, fsRetry.getMaxRetries()); + Assert.assertNotNull(retryPolicy.getRetryPredicate()); + } + + @Test + public void testRandomRetry() { + TableRetryPolicy retryPolicy = new TableRetryPolicy(); + retryPolicy.withRandomBackoff(Duration.ofMillis(1000), Duration.ofMillis(2000)); + retryPolicy.withJitter(Duration.ofMillis(100)); // no-op + Assert.assertEquals(TableRetryPolicy.BackoffType.RANDOM, retryPolicy.getBackoffType()); + RetryPolicy fsRetry = FailsafeAdapter.valueOf(retryPolicy); + Assert.assertEquals(1000, fsRetry.getDelayMin().toMillis()); + Assert.assertEquals(2000, fsRetry.getDelayMax().toMillis()); + } + + @Test + public void testExponentialRetry() { + TableRetryPolicy retryPolicy = new TableRetryPolicy(); + retryPolicy.withExponentialBackoff(Duration.ofMillis(1000), Duration.ofMillis(2000), 1.5); + retryPolicy.withJitter(Duration.ofMillis(100)); + Assert.assertEquals(TableRetryPolicy.BackoffType.EXPONENTIAL, retryPolicy.getBackoffType()); + RetryPolicy fsRetry = FailsafeAdapter.valueOf(retryPolicy); + Assert.assertEquals(1000, fsRetry.getDelay().toMillis()); + Assert.assertEquals(2000, fsRetry.getMaxDelay().toMillis()); + Assert.assertEquals(1.5, fsRetry.getDelayFactor(), 0.001); + Assert.assertEquals(100, fsRetry.getJitter().toMillis()); + } + + @Test + public void testCustomRetryPredicate() { + TableRetryPolicy retryPolicy = new TableRetryPolicy(); + retryPolicy.withRetryPredicate((e) -> e instanceof IllegalArgumentException); + Assert.assertTrue(retryPolicy.getRetryPredicate().test(new IllegalArgumentException())); + Assert.assertFalse(retryPolicy.getRetryPredicate().test(new NullPointerException())); + } +} diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java index e6e73bb4d9..6f62782dd1 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java @@ -93,6 +93,11 @@ public CompletableFuture getAsync(Integer key) { return CompletableFuture.completedFuture(profileMap.get(key)); } + @Override + public boolean isRetriable(Throwable exception) { + return false; + } + static InMemoryReadFunction getInMemoryReadFunction(String serializedProfiles) { return new InMemoryReadFunction(serializedProfiles); } @@ -125,6 +130,11 @@ public CompletableFuture deleteAsync(Integer key) { records.remove(key); return CompletableFuture.completedFuture(null); } + + @Override + public boolean isRetriable(Throwable exception) { + return false; + } } private Table> getCachingTable(Table> actualTable, boolean defaultCache, String id, StreamGraph streamGraph) { @@ -143,6 +153,18 @@ private Table> getCachingTable(Table> actualTable, bool return streamGraph.getTable(cachingDesc); } + static class MyReadFunction implements TableReadFunction { + @Override + public CompletableFuture getAsync(Object key) { + return null; + } + + @Override + public boolean isRetriable(Throwable exception) { + return false; + } + } + private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean defaultCache, String testName) throws Exception { final InMemoryWriteFunction writer = new InMemoryWriteFunction(testName); @@ -168,9 +190,12 @@ private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean default .withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles)) .withRateLimiter(readRateLimiter, null, null); + // dummy reader + TableReadFunction readFn = new MyReadFunction(); + RemoteTableDescriptor outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1"); outputTableDesc - .withReadFunction(key -> null) // dummy reader + .withReadFunction(readFn) .withWriteFunction(writer) .withRateLimiter(writeRateLimiter, null, null); diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java index 41b6509fc9..34ffbd49fd 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java @@ -24,6 +24,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; + import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.config.ConfigRewriter; @@ -116,15 +118,27 @@ public void testWithTableDescriptorsProviderClass() throws Exception { public static class MySampleNonTableDescriptorsProvider { } + static class MyReadFunction implements TableReadFunction { + @Override + public CompletableFuture getAsync(Object key) { + return null; + } + + @Override + public boolean isRetriable(Throwable exception) { + return false; + } + } + public static class MySampleTableDescriptorsProvider implements TableDescriptorsProvider { @Override public List getTableDescriptors(Config config) { List tableDescriptors = new ArrayList<>(); final RateLimiter readRateLimiter = mock(RateLimiter.class); - final TableReadFunction readRemoteTable = (TableReadFunction) key -> null; + final MyReadFunction readFn = new MyReadFunction(); tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1") - .withReadFunction(readRemoteTable) + .withReadFunction(readFn) .withRateLimiter(readRateLimiter, null, null) .withSerde(KVSerde.of(new StringSerde(), new LongSerde()))); tableDescriptors.add(new RocksDbTableDescriptor("local-table-1")