From ca959912d1c0f3a6c7a2752e81d260b501dd75a8 Mon Sep 17 00:00:00 2001 From: Peng Du Date: Tue, 28 Aug 2018 12:29:25 -0700 Subject: [PATCH 1/4] table: add common retry for IO functions Add common retry functionality to table IO functions for data stores that do not have native retry support. We use failsafe as the retry library. --- build.gradle | 1 + gradle/dependency-versions.gradle | 1 + .../table/remote/RemoteReadWriteTable.java | 2 +- .../table/remote/RemoteReadableTable.java | 2 +- .../table/remote/RemoteTableDescriptor.java | 46 ++- .../table/remote/RemoteTableProvider.java | 42 ++- .../samza/table/remote/TableReadFunction.java | 11 + .../table/remote/TableWriteFunction.java | 11 + .../table/retry/RetriableReadFunction.java | 113 +++++++ .../table/retry/RetriableWriteFunction.java | 131 +++++++++ .../samza/table/retry/RetryMetrics.java | 53 ++++ .../samza/table/retry/TableRetryPolicy.java | 169 +++++++++++ .../samza/table/remote/TestRemoteTable.java | 116 +++++--- .../remote/TestRemoteTableDescriptor.java | 8 +- .../retry/TestRetriableTableFunctions.java | 278 ++++++++++++++++++ 15 files changed, 943 insertions(+), 41 deletions(-) create mode 100644 samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java create mode 100644 samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java create mode 100644 samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java create mode 100644 samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java create mode 100644 samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java diff --git a/build.gradle b/build.gradle index 7d23717f93..8b682050bb 100644 --- a/build.gradle +++ b/build.gradle @@ -184,6 +184,7 @@ project(":samza-core_$scalaVersion") { compile "org.eclipse.jetty:jetty-webapp:$jettyVersion" compile "org.scala-lang:scala-library:$scalaLibVersion" compile "org.slf4j:slf4j-api:$slf4jVersion" + compile "net.jodah:failsafe:$failsafeVersion" testCompile project(":samza-api").sourceSets.test.output testCompile "junit:junit:$junitVersion" testCompile "org.mockito:mockito-core:$mockitoVersion" diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle index 3c4c3a9303..b33ab82e9a 100644 --- a/gradle/dependency-versions.gradle +++ b/gradle/dependency-versions.gradle @@ -45,4 +45,5 @@ yarnVersion = "2.6.1" zkClientVersion = "0.8" zookeeperVersion = "3.4.6" + failsafeVersion = "1.1.0" } diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java index 88bc7df33f..9ef4c1baa2 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadWriteTable.java @@ -43,11 +43,11 @@ * @param the type of the value in this table */ public class RemoteReadWriteTable extends RemoteReadableTable implements ReadWriteTable { - private final TableWriteFunction writeFn; private DefaultTableWriteMetrics writeMetrics; @VisibleForTesting + final TableWriteFunction writeFn; final TableRateLimiter writeRateLimiter; public RemoteReadWriteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn, diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java index 3186fee94a..b3d82f3463 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteReadableTable.java @@ -80,10 +80,10 @@ public class RemoteReadableTable implements ReadableTable { protected final ExecutorService callbackExecutor; protected final ExecutorService tableExecutor; - private final TableReadFunction readFn; private DefaultTableReadMetrics readMetrics; @VisibleForTesting + final TableReadFunction readFn; final TableRateLimiter readRateLimiter; /** diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java index a8d419d1f3..537ff878f7 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableDescriptor.java @@ -24,6 +24,7 @@ import org.apache.samza.operators.BaseTableDescriptor; import org.apache.samza.table.TableSpec; +import org.apache.samza.table.retry.TableRetryPolicy; import org.apache.samza.table.utils.SerdeUtils; import org.apache.samza.util.EmbeddedTaggedRateLimiter; import org.apache.samza.util.RateLimiter; @@ -70,6 +71,9 @@ public class RemoteTableDescriptor extends BaseTableDescriptor readCreditFn; private TableRateLimiter.CreditFunction writeCreditFn; + private TableRetryPolicy readRetryPolicy; + private TableRetryPolicy writeRetryPolicy; + // By default execute future callbacks on the native client threads // ie. no additional thread pool for callbacks. private int asyncCallbackPoolSize = -1; @@ -115,13 +119,23 @@ public TableSpec getTableSpec() { "write credit function", writeCreditFn)); } + if (readRetryPolicy != null) { + tableSpecConfig.put(RemoteTableProvider.READ_RETRY_POLICY, SerdeUtils.serialize( + "read retry policy", readRetryPolicy)); + } + + if (writeRetryPolicy != null) { + tableSpecConfig.put(RemoteTableProvider.WRITE_RETRY_POLICY, SerdeUtils.serialize( + "write retry policy", writeRetryPolicy)); + } + tableSpecConfig.put(RemoteTableProvider.ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize)); return new TableSpec(tableId, serde, RemoteTableProviderFactory.class.getName(), tableSpecConfig); } /** - * Use specified TableReadFunction with remote table. + * Use specified TableReadFunction with remote table and a retry policy. * @param readFn read function instance * @return this table descriptor instance */ @@ -132,7 +146,7 @@ public RemoteTableDescriptor withReadFunction(TableReadFunction read } /** - * Use specified TableWriteFunction with remote table. + * Use specified TableWriteFunction with remote table and a retry policy. * @param writeFn write function instance * @return this table descriptor instance */ @@ -142,6 +156,34 @@ public RemoteTableDescriptor withWriteFunction(TableWriteFunction wr return this; } + /** + * Use specified TableReadFunction with remote table. + * @param readFn read function instance + * @param retryPolicy retry policy for the read function + * @return this table descriptor instance + */ + public RemoteTableDescriptor withReadFunction(TableReadFunction readFn, TableRetryPolicy retryPolicy) { + Preconditions.checkNotNull(readFn, "null read function"); + Preconditions.checkNotNull(retryPolicy, "null retry policy"); + this.readFn = readFn; + this.readRetryPolicy = retryPolicy; + return this; + } + + /** + * Use specified TableWriteFunction with remote table. + * @param writeFn write function instance + * @param retryPolicy retry policy for the write function + * @return this table descriptor instance + */ + public RemoteTableDescriptor withWriteFunction(TableWriteFunction writeFn, TableRetryPolicy retryPolicy) { + Preconditions.checkNotNull(writeFn, "null write function"); + Preconditions.checkNotNull(retryPolicy, "null retry policy"); + this.writeFn = writeFn; + this.writeRetryPolicy = retryPolicy; + return this; + } + /** * Specify a rate limiter along with credit functions to map a table record (as KV) to the amount * of credits to be charged from the rate limiter for table read and write operations. diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java index 6c5d9b38ed..e476d77b4c 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java @@ -25,11 +25,16 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.apache.samza.table.Table; import org.apache.samza.table.TableSpec; +import org.apache.samza.table.retry.RetriableReadFunction; +import org.apache.samza.table.retry.RetriableWriteFunction; +import org.apache.samza.table.retry.TableRetryPolicy; import org.apache.samza.table.utils.BaseTableProvider; import org.apache.samza.table.utils.SerdeUtils; +import org.apache.samza.table.utils.TableMetricsUtil; import org.apache.samza.util.RateLimiter; import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG; @@ -47,6 +52,8 @@ public class RemoteTableProvider extends BaseTableProvider { static final String READ_CREDIT_FN = "io.read.credit.func"; static final String WRITE_CREDIT_FN = "io.write.credit.func"; static final String ASYNC_CALLBACK_POOL_SIZE = "io.async.callback.pool.size"; + static final String READ_RETRY_POLICY = "io.read.retry.policy"; + static final String WRITE_RETRY_POLICY = "io.write.retry.policy"; private final boolean readOnly; private final List> tables = new ArrayList<>(); @@ -58,6 +65,7 @@ public class RemoteTableProvider extends BaseTableProvider { */ private static Map tableExecutors = new ConcurrentHashMap<>(); private static Map callbackExecutors = new ConcurrentHashMap<>(); + private static ScheduledExecutorService retryExecutor; public RemoteTableProvider(TableSpec tableSpec) { super(tableSpec); @@ -72,7 +80,7 @@ public Table getTable() { RemoteReadableTable table; String tableId = tableSpec.getId(); - TableReadFunction readFn = getReadFn(); + TableReadFunction readFn = getReadFn(); RateLimiter rateLimiter = deserializeObject(RATE_LIMITER); if (rateLimiter != null) { rateLimiter.init(containerContext.config, taskContext); @@ -83,11 +91,33 @@ public Table getTable() { TableRateLimiter.CreditFunction writeCreditFn; TableRateLimiter writeRateLimiter = null; + TableRetryPolicy readRetryPolicy = deserializeObject(READ_RETRY_POLICY); + TableRetryPolicy writeRetryPolicy = null; + + if (readRetryPolicy != null || writeRetryPolicy != null) { + retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> { + Thread thread = new Thread(runnable); + thread.setName("table-retry-executor"); + thread.setDaemon(true); + return thread; + }); + } + + if (readRetryPolicy != null) { + readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor); + } + + TableWriteFunction writeFn = getWriteFn(); + boolean isRateLimited = readRateLimiter.isRateLimited(); if (!readOnly) { writeCreditFn = deserializeObject(WRITE_CREDIT_FN); writeRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, writeCreditFn, RL_WRITE_TAG); isRateLimited |= writeRateLimiter.isRateLimited(); + writeRetryPolicy = deserializeObject(WRITE_RETRY_POLICY); + if (writeRetryPolicy != null) { + writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor); + } } // Optional executor for future callback/completion. Shared by both read and write operations. @@ -116,10 +146,18 @@ public Table getTable() { table = new RemoteReadableTable(tableSpec.getId(), readFn, readRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId)); } else { - table = new RemoteReadWriteTable(tableSpec.getId(), readFn, getWriteFn(), readRateLimiter, + table = new RemoteReadWriteTable(tableSpec.getId(), readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId)); } + TableMetricsUtil metricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId); + if (readRetryPolicy != null) { + ((RetriableReadFunction) readFn).setMetrics(metricsUtil); + } + if (writeRetryPolicy != null) { + ((RetriableWriteFunction) writeFn).setMetrics(metricsUtil); + } + table.init(containerContext, taskContext); tables.add(table); return table; diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java index 5d0f96319e..8b33a943b7 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java @@ -100,5 +100,16 @@ default CompletableFuture> getAllAsync(Collection keys) { .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().join()))); } + /** + * Determine whether the current operation can be retried with the last thrown exception. + * The default implementation disables retries for any type of exceptions. This can be + * overridden to customize the policy based on the data store protocol. + * @param exception exception thrown by a table operation + * @return whether the operation can be retried + */ + default boolean isRetriable(Throwable exception) { + return false; + } + // optionally implement readObject() to initialize transient states } diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java index 0ac3a0c204..b1676241a0 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java @@ -142,6 +142,17 @@ default CompletableFuture deleteAllAsync(Collection keys) { return CompletableFuture.allOf(Iterables.toArray(deleteFutures, CompletableFuture.class)); } + /** + * Determine whether the current operation can be retried with the last thrown exception. + * The default implementation disables retries for any type of exceptions. This can be + * overridden to customize the policy based on the data store protocol. + * @param exception exception thrown by a table operation + * @return whether the operation can be retried + */ + default boolean isRetriable(Throwable exception) { + return false; + } + /** * Flush the remote store (optional) */ diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java new file mode 100644 index 0000000000..7fae45c7e7 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.samza.SamzaException; +import org.apache.samza.table.remote.TableReadFunction; +import org.apache.samza.table.utils.TableMetricsUtil; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import net.jodah.failsafe.AsyncFailsafe; +import net.jodah.failsafe.Failsafe; +import net.jodah.failsafe.RetryPolicy; + + +/** + * Wrapper for a {@link TableReadFunction} instance to add common retry + * support with a {@link TableRetryPolicy}. This wrapper is created by + * {@link org.apache.samza.table.remote.RemoteTableProvider} when a retry + * policy is specified together with the {@link TableReadFunction}. + * + * Actual retry mechanism is provided by the failsafe library. Retry is + * attempted in an async way with a {@link ScheduledExecutorService}. + * + * @param the type of the key in this table + * @param the type of the value in this table + */ +public class RetriableReadFunction implements TableReadFunction { + private final RetryPolicy retryPolicy; + private final TableReadFunction readFn; + private final ScheduledExecutorService retryExecutor; + + @VisibleForTesting + RetryMetrics retryMetrics; + + public RetriableReadFunction(TableRetryPolicy policy, TableReadFunction readFn, + ScheduledExecutorService retryExecutor) { + Preconditions.checkNotNull(policy); + Preconditions.checkNotNull(readFn); + Preconditions.checkNotNull(retryExecutor); + + this.readFn = readFn; + this.retryExecutor = retryExecutor; + this.retryPolicy = policy.toFailsafePolicy(readFn::isRetriable); + } + + @Override + public CompletableFuture getAsync(K key) { + return getFailsafe() + .future(k -> readFn.getAsync(key)) + .exceptionally(e -> { + throw new SamzaException("Failed to get the record for " + key + " after retries.", e); + }); + } + + @Override + public CompletableFuture> getAllAsync(Collection keys) { + return getFailsafe() + .future(k -> readFn.getAllAsync(keys)) + .exceptionally(e -> { + throw new SamzaException("Failed to get the records for " + keys + " after retries.", e); + }); + } + + private AsyncFailsafe getFailsafe() { + long startMs = System.currentTimeMillis(); + return Failsafe.with(retryPolicy).with(retryExecutor) + .onRetry(e -> retryMetrics.retryCount.inc()) + .onRetriesExceeded(e -> retryMetrics.retryTimer.update(System.currentTimeMillis() - startMs)) + .onSuccess((e, ctx) -> { + if (ctx.getExecutions() > 1) { + retryMetrics.retryTimer.update(System.currentTimeMillis() - startMs); + } else { + retryMetrics.successCount.inc(); + } + }); + } + + @Override + public boolean isRetriable(Throwable exception) { + return readFn.isRetriable(exception); + } + + /** + * Initialize retry-related metrics + * @param metricsUtil metrics util + */ + public void setMetrics(TableMetricsUtil metricsUtil) { + this.retryMetrics = new RetryMetrics("reader", metricsUtil); + } +} diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java new file mode 100644 index 0000000000..e90f5b4b13 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.samza.SamzaException; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.table.remote.TableWriteFunction; +import org.apache.samza.table.utils.TableMetricsUtil; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import net.jodah.failsafe.AsyncFailsafe; +import net.jodah.failsafe.Failsafe; +import net.jodah.failsafe.RetryPolicy; + + +/** + * Wrapper for a {@link TableWriteFunction} instance to add common retry + * support with a {@link TableRetryPolicy}. This wrapper is created by + * {@link org.apache.samza.table.remote.RemoteTableProvider} when a retry + * policy is specified together with the {@link TableWriteFunction}. + * + * Actual retry mechanism is provided by the failsafe library. Retry is + * attempted in an async way with a {@link ScheduledExecutorService}. + * + * @param the type of the key in this table + * @param the type of the value in this table + */ +public class RetriableWriteFunction implements TableWriteFunction { + private final RetryPolicy retryPolicy; + private final TableWriteFunction writeFn; + private final ScheduledExecutorService retryExecutor; + + @VisibleForTesting + RetryMetrics retryMetrics; + + public RetriableWriteFunction(TableRetryPolicy policy, TableWriteFunction writeFn, + ScheduledExecutorService retryExecutor) { + Preconditions.checkNotNull(policy); + Preconditions.checkNotNull(writeFn); + Preconditions.checkNotNull(retryExecutor); + + this.writeFn = writeFn; + this.retryExecutor = retryExecutor; + this.retryPolicy = policy.toFailsafePolicy(writeFn::isRetriable); + } + + @Override + public CompletableFuture putAsync(K key, V record) { + return failsafe() + .future(k -> writeFn.putAsync(key, record)) + .exceptionally(e -> { + throw new SamzaException("Failed to get the record for " + key + " after retries.", e); + }); + } + + @Override + public CompletableFuture putAllAsync(Collection> records) { + return failsafe() + .future(k -> writeFn.putAllAsync(records)) + .exceptionally(e -> { + throw new SamzaException("Failed to put records after retries.", e); + }); + } + + @Override + public CompletableFuture deleteAsync(K key) { + return failsafe() + .future(k -> writeFn.deleteAsync(key)) + .exceptionally(e -> { + throw new SamzaException("Failed to delete the record for " + key + " after retries.", e); + }); + } + + @Override + public CompletableFuture deleteAllAsync(Collection keys) { + return failsafe() + .future(k -> writeFn.deleteAllAsync(keys)) + .exceptionally(e -> { + throw new SamzaException("Failed to delete the records for " + keys + " after retries.", e); + }); + } + + @Override + public boolean isRetriable(Throwable exception) { + return writeFn.isRetriable(exception); + } + + private AsyncFailsafe failsafe() { + long startMs = System.currentTimeMillis(); + return Failsafe.with(retryPolicy).with(retryExecutor) + .onRetry(e -> retryMetrics.retryCount.inc()) + .onRetriesExceeded(e -> retryMetrics.retryTimer.update(System.currentTimeMillis() - startMs)) + .onSuccess((e, ctx) -> { + if (ctx.getExecutions() > 1) { + retryMetrics.retryTimer.update(System.currentTimeMillis() - startMs); + } else { + retryMetrics.successCount.inc(); + } + }); + } + + /** + * Initialize retry-related metrics. + * @param metricsUtil metrics util + */ + public void setMetrics(TableMetricsUtil metricsUtil) { + this.retryMetrics = new RetryMetrics("writer", metricsUtil); + } +} diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java new file mode 100644 index 0000000000..4e0654a792 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import org.apache.samza.metrics.Counter; +import org.apache.samza.metrics.Timer; +import org.apache.samza.table.utils.TableMetricsUtil; + + +/** + * Wrapper of retry-related metrics common to both {@link RetriableReadFunction} and + * {@link RetriableWriteFunction}. + */ +class RetryMetrics { + /** + * Number of retries executed (excluding the first attempt) + */ + final Counter retryCount; + + /** + * Number of successes with only the first attempt + */ + final Counter successCount; + + /** + * Total time spent in each IO; this is updated only + * when at least one retries have been attempted. + */ + final Timer retryTimer; + + public RetryMetrics(String prefix, TableMetricsUtil metricsUtil) { + retryCount = metricsUtil.newCounter(prefix + "-retry-count"); + successCount = metricsUtil.newCounter(prefix + "-success-count"); + retryTimer = metricsUtil.newTimer(prefix + "-retry-timer"); + } +} diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java b/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java new file mode 100644 index 0000000000..4267e94a6b --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import java.io.Serializable; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; + +import org.apache.samza.SamzaException; + +import net.jodah.failsafe.RetryPolicy; + + +/** + * Common retry policy parameters for table IO. This serves as an abstraction on top of + * retry libraries. This common policy supports below features: + * - backoff modes: fixed, random, exponential + * - termination modes: by attempts, by duration + * - jitter + * + * Retry libraries can implement a subset or all features as described by this common policy. + * Currently, the policy object can be translated into {@link RetryPolicy} of failsafe library. + */ +public class TableRetryPolicy implements Serializable { + enum BackoffType { NONE, FIXED, RANDOM, EXPONENTIAL } + + // Backoff parameters + private long sleepMs; + private long randomMinMs; + private long randomMaxMs; + private double exponentialFactor; + private long exponentialMaxSleepMs; + private long jitterMs; + + // By default no early termination + private int maxAttempts = -1; + private long maxDelayMs = Long.MAX_VALUE; + + // By default no backoff during retries + private BackoffType backoffType = BackoffType.NONE; + + /** + * Set the sleep time for the fixed backoff policy. + * @param sleepMs sleep time in milliseconds + * @return this policy instance + */ + public TableRetryPolicy withFixedBackoff(long sleepMs) { + this.sleepMs = sleepMs; + this.backoffType = BackoffType.FIXED; + return this; + } + + /** + * Set the sleep time for the random backoff policy. The actual sleep time + * before each attempt is randomly selected between {@code [minSleepMs, maxSleepMs]} + * @param minSleepMs lower bound sleep time in milliseconds + * @param maxSleepMs upper bound sleep time in milliseconds + * @return this policy instance + */ + public TableRetryPolicy withRandomBackoff(long minSleepMs, long maxSleepMs) { + this.randomMinMs = minSleepMs; + this.randomMaxMs = maxSleepMs; + this.backoffType = BackoffType.RANDOM; + return this; + } + + /** + * Set the parameters for the exponential backoff policy. The actual sleep time + * is exponentially incremented up to the {@code maxSleepMs} and multiplying + * successive delays by the {@code factor}. + * @param sleepMs initial sleep time in milliseconds + * @param maxSleepMs upper bound sleep time in milliseconds + * @param factor exponential factor for backoff + * @return this policy instance + */ + public TableRetryPolicy withExponentialBackoff(long sleepMs, long maxSleepMs, double factor) { + this.sleepMs = sleepMs; + this.exponentialMaxSleepMs = maxSleepMs; + this.exponentialFactor = factor; + this.backoffType = BackoffType.EXPONENTIAL; + return this; + } + + /** + * Set the jitter for the backoff policy to provide additional randomness. + * If this is set, a random value between {@code [0, jitter]} will be added + * to each sleep time. + * @param jitterMs initial sleep time in milliseconds + * @return this policy instance + */ + public TableRetryPolicy withJitter(long jitterMs) { + this.jitterMs = jitterMs; + return this; + } + + /** + * Set maximum number of attempts before terminating the operation. + * @param maxAttempts number of attempts + * @return this policy instance + */ + public TableRetryPolicy withStopAfterAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + return this; + } + + /** + * Set maximum total delay (sleep + execution) before terminating the operation. + * @param maxDelayMs delay time in milliseconds + * @return this policy instance + */ + public TableRetryPolicy withStopAfterDelay(long maxDelayMs) { + this.maxDelayMs = maxDelayMs; + return this; + } + + /** + * Convert the TableRetryPolicy to failsafe {@link RetryPolicy}. + * @param isRetriable predicate to signal retriable exceptions. + * @return this policy instance + */ + RetryPolicy toFailsafePolicy(Predicate isRetriable) { + RetryPolicy policy = new RetryPolicy(); + policy.withMaxDuration(maxDelayMs, TimeUnit.MILLISECONDS); + policy.withMaxRetries(maxAttempts); + if (jitterMs != 0) { + policy.withJitter(jitterMs, TimeUnit.MILLISECONDS); + } + policy.retryOn((e) -> isRetriable.test(e)); + + switch (backoffType) { + case NONE: + break; + + case FIXED: + policy.withDelay(sleepMs, TimeUnit.MILLISECONDS); + break; + + case RANDOM: + policy.withDelay(randomMinMs, randomMaxMs, TimeUnit.MILLISECONDS); + break; + + case EXPONENTIAL: + policy.withBackoff(sleepMs, exponentialMaxSleepMs, TimeUnit.MILLISECONDS, exponentialFactor); + break; + + default: + throw new SamzaException("Unknown retry policy type."); + } + + return policy; + } +} diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java index 21fc6a5752..3e844c32b8 100644 --- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java +++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTable.java @@ -27,6 +27,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import org.apache.samza.container.SamzaContainerContext; import org.apache.samza.metrics.Counter; @@ -34,6 +35,9 @@ import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.metrics.Timer; import org.apache.samza.storage.kv.Entry; +import org.apache.samza.table.retry.RetriableReadFunction; +import org.apache.samza.table.retry.RetriableWriteFunction; +import org.apache.samza.table.retry.TableRetryPolicy; import org.apache.samza.task.TaskContext; import org.junit.Test; import org.mockito.ArgumentCaptor; @@ -43,6 +47,7 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyCollection; import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -50,6 +55,18 @@ public class TestRemoteTable { + private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor(); + + public static TaskContext getMockTaskContext() { + MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); + doAnswer(args -> new Timer((String) args.getArguments()[0])).when(metricsRegistry).newTimer(anyString(), anyString()); + doAnswer(args -> new Counter((String) args.getArguments()[0])).when(metricsRegistry).newCounter(anyString(), anyString()); + doAnswer(args -> new Gauge((String) args.getArguments()[0], 0)).when(metricsRegistry).newGauge(anyString(), any()); + TaskContext taskContext = mock(TaskContext.class); + doReturn(metricsRegistry).when(taskContext).getMetricsRegistry(); + return taskContext; + } + private > T getTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn) { return getTable(tableId, readFn, writeFn, null); @@ -72,12 +89,7 @@ private > T getTable(String tableId, table = new RemoteReadWriteTable(tableId, readFn, writeFn, readRateLimiter, writeRateLimiter, tableExecutor, cbExecutor); } - TaskContext taskContext = mock(TaskContext.class); - MetricsRegistry metricsRegistry = mock(MetricsRegistry.class); - doReturn(mock(Timer.class)).when(metricsRegistry).newTimer(anyString(), anyString()); - doReturn(mock(Counter.class)).when(metricsRegistry).newCounter(anyString(), anyString()); - doReturn(mock(Gauge.class)).when(metricsRegistry).newGauge(anyString(), any()); - doReturn(metricsRegistry).when(taskContext).getMetricsRegistry(); + TaskContext taskContext = getMockTaskContext(); SamzaContainerContext containerContext = mock(SamzaContainerContext.class); @@ -86,35 +98,53 @@ private > T getTable(String tableId, return (T) table; } - private void doTestGet(boolean sync, boolean error) throws Exception { + private void doTestGet(boolean sync, boolean error, boolean retry) throws Exception { + String tableId = "testGet-" + sync + error + retry; TableReadFunction readFn = mock(TableReadFunction.class); // Sync is backed by async so needs to mock the async method CompletableFuture future; if (error) { future = new CompletableFuture(); future.completeExceptionally(new RuntimeException("Test exception")); + if (!retry) { + doReturn(future).when(readFn).getAsync(anyString()); + } else { + final int [] times = new int[] {0}; + doAnswer(args -> times[0]++ == 0 ? future : CompletableFuture.completedFuture("bar")) + .when(readFn).getAsync(anyString()); + } } else { future = CompletableFuture.completedFuture("bar"); + doReturn(future).when(readFn).getAsync(anyString()); } - doReturn(future).when(readFn).getAsync(anyString()); - RemoteReadableTable table = getTable("testGet-" + sync + error, readFn, null); + if (retry) { + doReturn(true).when(readFn).isRetriable(any()); + TableRetryPolicy policy = new TableRetryPolicy(); + readFn = new RetriableReadFunction<>(policy, readFn, schedExec); + } + RemoteReadableTable table = getTable(tableId, readFn, null); Assert.assertEquals("bar", sync ? table.get("foo") : table.getAsync("foo").get()); verify(table.readRateLimiter, times(1)).throttle(anyString()); } @Test public void testGet() throws Exception { - doTestGet(true, false); + doTestGet(true, false, false); } @Test public void testGetAsync() throws Exception { - doTestGet(false, false); + doTestGet(false, false, false); } @Test(expected = ExecutionException.class) public void testGetAsyncError() throws Exception { - doTestGet(false, true); + doTestGet(false, true, false); + } + + @Test + public void testGetAsyncErrorRetried() throws Exception { + doTestGet(false, true, true); } @Test @@ -139,23 +169,36 @@ public void testGetMultipleTables() { }); } - private void doTestPut(boolean sync, boolean error, boolean isDelete) throws Exception { - TableWriteFunction writeFn = mock(TableWriteFunction.class); - RemoteReadWriteTable table = getTable("testPut-" + sync + error + isDelete, - mock(TableReadFunction.class), writeFn); - CompletableFuture future; - if (error) { - future = new CompletableFuture(); - future.completeExceptionally(new RuntimeException("Test exception")); + private void doTestPut(boolean sync, boolean error, boolean isDelete, boolean retry) throws Exception { + String tableId = "testPut-" + sync + error + isDelete + retry; + TableWriteFunction mockWriteFn = mock(TableWriteFunction.class); + TableWriteFunction writeFn = mockWriteFn; + CompletableFuture successFuture = CompletableFuture.completedFuture(null); + CompletableFuture failureFuture = new CompletableFuture(); + failureFuture.completeExceptionally(new RuntimeException("Test exception")); + if (!error) { + if (isDelete) { + doReturn(successFuture).when(writeFn).deleteAsync(any()); + } else { + doReturn(successFuture).when(writeFn).putAsync(any(), any()); + } + } else if (!retry) { + if (isDelete) { + doReturn(failureFuture).when(writeFn).deleteAsync(any()); + } else { + doReturn(failureFuture).when(writeFn).putAsync(any(), any()); + } } else { - future = CompletableFuture.completedFuture(null); - } - // Sync is backed by async so needs to mock the async method - if (isDelete) { - doReturn(future).when(writeFn).deleteAsync(any()); - } else { - doReturn(future).when(writeFn).putAsync(any(), any()); + doReturn(true).when(writeFn).isRetriable(any()); + final int [] times = new int[] {0}; + if (isDelete) { + doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).deleteAsync(any()); + } else { + doAnswer(args -> times[0]++ == 0 ? failureFuture : successFuture).when(writeFn).putAsync(any(), any()); + } + writeFn = new RetriableWriteFunction<>(new TableRetryPolicy(), writeFn, schedExec); } + RemoteReadWriteTable table = getTable(tableId, mock(TableReadFunction.class), writeFn); if (sync) { table.put("foo", isDelete ? null : "bar"); } else { @@ -164,9 +207,9 @@ private void doTestPut(boolean sync, boolean error, boolean isDelete) throws Exc ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(String.class); ArgumentCaptor valCaptor = ArgumentCaptor.forClass(String.class); if (isDelete) { - verify(writeFn, times(1)).deleteAsync(keyCaptor.capture()); + verify(mockWriteFn, times(1)).deleteAsync(keyCaptor.capture()); } else { - verify(writeFn, times(1)).putAsync(keyCaptor.capture(), valCaptor.capture()); + verify(mockWriteFn, times(retry ? 2 : 1)).putAsync(keyCaptor.capture(), valCaptor.capture()); Assert.assertEquals("bar", valCaptor.getValue()); } Assert.assertEquals("foo", keyCaptor.getValue()); @@ -179,27 +222,32 @@ private void doTestPut(boolean sync, boolean error, boolean isDelete) throws Exc @Test public void testPut() throws Exception { - doTestPut(true, false, false); + doTestPut(true, false, false, false); } @Test public void testPutDelete() throws Exception { - doTestPut(true, false, true); + doTestPut(true, false, true, false); } @Test public void testPutAsync() throws Exception { - doTestPut(false, false, false); + doTestPut(false, false, false, false); } @Test public void testPutAsyncDelete() throws Exception { - doTestPut(false, false, true); + doTestPut(false, false, true, false); } @Test(expected = ExecutionException.class) public void testPutAsyncError() throws Exception { - doTestPut(false, true, false); + doTestPut(false, true, false, false); + } + + @Test + public void testPutAsyncErrorRetried() throws Exception { + doTestPut(false, true, false, true); } private void doTestDelete(boolean sync, boolean error) throws Exception { diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java index e30da12525..6093173c6b 100644 --- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java +++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java @@ -31,6 +31,9 @@ import org.apache.samza.metrics.Timer; import org.apache.samza.table.Table; import org.apache.samza.table.TableSpec; +import org.apache.samza.table.retry.RetriableReadFunction; +import org.apache.samza.table.retry.RetriableWriteFunction; +import org.apache.samza.table.retry.TableRetryPolicy; import org.apache.samza.task.TaskContext; import org.apache.samza.util.EmbeddedTaggedRateLimiter; import org.apache.samza.util.RateLimiter; @@ -138,7 +141,7 @@ public int getCredits(K key, V value) { private void doTestDeserializeReadFunctionAndLimiter(boolean rateOnly, boolean rlGets, boolean rlPuts) { int numRateLimitOps = (rlGets ? 1 : 0) + (rlPuts ? 1 : 0); RemoteTableDescriptor desc = new RemoteTableDescriptor("1"); - desc.withReadFunction(mock(TableReadFunction.class)); + desc.withReadFunction(mock(TableReadFunction.class), new TableRetryPolicy()); desc.withWriteFunction(mock(TableWriteFunction.class)); desc.withAsyncCallbackExecutorPoolSize(10); @@ -178,6 +181,9 @@ private void doTestDeserializeReadFunctionAndLimiter(boolean rateOnly, boolean r ThreadPoolExecutor callbackExecutor = (ThreadPoolExecutor) rwTable.callbackExecutor; Assert.assertEquals(10, callbackExecutor.getCorePoolSize()); + + Assert.assertNotNull(rwTable.readFn instanceof RetriableReadFunction); + Assert.assertNotNull(!(rwTable.writeFn instanceof RetriableWriteFunction)); } @Test diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java new file mode 100644 index 0000000000..8b99d545b3 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.samza.container.SamzaContainerContext; +import org.apache.samza.storage.kv.Entry; +import org.apache.samza.table.Table; +import org.apache.samza.table.remote.TableReadFunction; +import org.apache.samza.table.remote.TableWriteFunction; +import org.apache.samza.table.remote.TestRemoteTable; +import org.apache.samza.table.utils.TableMetricsUtil; +import org.apache.samza.task.TaskContext; +import org.junit.Test; + +import junit.framework.Assert; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + + +public class TestRetriableTableFunctions { + private final ScheduledExecutorService schedExec = Executors.newSingleThreadScheduledExecutor(); + + public TableMetricsUtil getMetricsUtil(String tableId) { + Table table = mock(Table.class); + SamzaContainerContext cntCtx = mock(SamzaContainerContext.class); + TaskContext taskCtx = TestRemoteTable.getMockTaskContext(); + return new TableMetricsUtil(cntCtx, taskCtx, table, tableId); + } + + @Test + public void testFirstTimeSuccessGet() throws Exception { + String tableId = "testFirstTimeSuccessGet"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(100); + TableReadFunction readFn = mock(TableReadFunction.class); + doReturn(true).when(readFn).isRetriable(any()); + doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString()); + RetriableReadFunction retryIO = new RetriableReadFunction<>(policy, readFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + Assert.assertEquals("bar", retryIO.getAsync("foo").get()); + verify(readFn, times(1)).getAsync(anyString()); + + Assert.assertEquals(0, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(1, retryIO.retryMetrics.successCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.retryTimer.getSnapshot().getMax()); + } + + @Test + public void testRetryEngagedGet() throws Exception { + String tableId = "testRetryEngagedGet"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(10); + TableReadFunction readFn = mock(TableReadFunction.class); + doReturn(true).when(readFn).isRetriable(any()); + + int [] times = new int[] {0}; + Map map = new HashMap<>(); + map.put("foo1", "bar1"); + map.put("foo2", "bar2"); + doAnswer(invocation -> { + CompletableFuture> future = new CompletableFuture(); + if (times[0] > 0) { + future.complete(map); + } else { + times[0]++; + future.completeExceptionally(new RuntimeException("test exception")); + } + return future; + }).when(readFn).getAllAsync(any()); + + RetriableReadFunction retryIO = new RetriableReadFunction<>(policy, readFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + Assert.assertEquals(map, retryIO.getAllAsync(Arrays.asList("foo1", "foo2")).get()); + verify(readFn, times(2)).getAllAsync(any()); + + Assert.assertEquals(1, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + + @Test + public void testRetryExhaustedTimeGet() throws Exception { + String tableId = "testRetryExhaustedTime"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(5); + policy.withStopAfterDelay(100); + TableReadFunction readFn = mock(TableReadFunction.class); + doReturn(true).when(readFn).isRetriable(any()); + + CompletableFuture future = new CompletableFuture(); + future.completeExceptionally(new RuntimeException("test exception")); + doReturn(future).when(readFn).getAsync(anyString()); + + RetriableReadFunction retryIO = new RetriableReadFunction<>(policy, readFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + try { + retryIO.getAsync("foo").get(); + Assert.fail(); + } catch (ExecutionException e) { + } + + // Conservatively: must be at least 3 attempts with 5ms backoff and 100ms maxDelay + verify(readFn, atLeast(3)).getAsync(anyString()); + Assert.assertTrue(retryIO.retryMetrics.retryCount.getCount() >= 3); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + + @Test + public void testRetryExhaustedAttemptsGet() throws Exception { + String tableId = "testRetryExhaustedAttempts"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(5); + policy.withStopAfterAttempts(10); + TableReadFunction readFn = mock(TableReadFunction.class); + doReturn(true).when(readFn).isRetriable(any()); + + CompletableFuture future = new CompletableFuture(); + future.completeExceptionally(new RuntimeException("test exception")); + doReturn(future).when(readFn).getAllAsync(any()); + + RetriableReadFunction retryIO = new RetriableReadFunction<>(policy, readFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + try { + retryIO.getAllAsync(Arrays.asList("foo1", "foo2")).get(); + Assert.fail(); + } catch (ExecutionException e) { + } + + // 1 initial try + 10 retries + verify(readFn, times(11)).getAllAsync(any()); + Assert.assertEquals(10, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + + @Test + public void testFirstTimeSuccessPut() throws Exception { + String tableId = "testFirstTimeSuccessPut"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(100); + TableWriteFunction writeFn = mock(TableWriteFunction.class); + doReturn(true).when(writeFn).isRetriable(any()); + doReturn(CompletableFuture.completedFuture("bar")).when(writeFn).putAsync(anyString(), anyString()); + RetriableWriteFunction retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + retryIO.putAsync("foo", "bar").get(); + verify(writeFn, times(1)).putAsync(anyString(), anyString()); + + Assert.assertEquals(0, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(1, retryIO.retryMetrics.successCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.retryTimer.getSnapshot().getMax()); + } + + @Test + public void testRetryEngagedPut() throws Exception { + String tableId = "testRetryEngagedPut"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(10); + TableWriteFunction writeFn = mock(TableWriteFunction.class); + doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAllAsync(any()); + doReturn(true).when(writeFn).isRetriable(any()); + + int [] times = new int[] {0}; + List> records = new ArrayList<>(); + records.add(new Entry<>("foo1", "bar1")); + records.add(new Entry<>("foo2", "bar2")); + doAnswer(invocation -> { + CompletableFuture> future = new CompletableFuture(); + if (times[0] > 0) { + future.complete(null); + } else { + times[0]++; + future.completeExceptionally(new RuntimeException("test exception")); + } + return future; + }).when(writeFn).putAllAsync(any()); + + RetriableWriteFunction retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + retryIO.putAllAsync(records).get(); + verify(writeFn, times(2)).putAllAsync(any()); + + Assert.assertEquals(1, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + + @Test + public void testRetryExhaustedTimePut() throws Exception { + String tableId = "testRetryExhaustedTimePut"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(5); + policy.withStopAfterDelay(100); + TableWriteFunction writeFn = mock(TableWriteFunction.class); + doReturn(true).when(writeFn).isRetriable(any()); + + CompletableFuture future = new CompletableFuture(); + future.completeExceptionally(new RuntimeException("test exception")); + doReturn(future).when(writeFn).deleteAsync(anyString()); + + RetriableWriteFunction retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + try { + retryIO.deleteAsync("foo").get(); + Assert.fail(); + } catch (ExecutionException e) { + } + + // Conservatively: must be at least 3 attempts with 5ms backoff and 100ms maxDelay + verify(writeFn, atLeast(3)).deleteAsync(anyString()); + Assert.assertTrue(retryIO.retryMetrics.retryCount.getCount() >= 3); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + + @Test + public void testRetryExhaustedAttemptsPut() throws Exception { + String tableId = "testRetryExhaustedAttemptsPut"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(5); + policy.withStopAfterAttempts(10); + TableWriteFunction writeFn = mock(TableWriteFunction.class); + doReturn(true).when(writeFn).isRetriable(any()); + + CompletableFuture future = new CompletableFuture(); + future.completeExceptionally(new RuntimeException("test exception")); + doReturn(future).when(writeFn).deleteAllAsync(any()); + + RetriableWriteFunction retryIO = new RetriableWriteFunction<>(policy, writeFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + try { + retryIO.deleteAllAsync(Arrays.asList("foo1", "foo2")).get(); + Assert.fail(); + } catch (ExecutionException e) { + } + + // 1 initial try + 10 retries + verify(writeFn, times(11)).deleteAllAsync(any()); + Assert.assertEquals(10, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } +} From 8e19fc5d6a79f4bb4758205f50d966be4f344e17 Mon Sep 17 00:00:00 2001 From: Peng Du Date: Mon, 10 Sep 2018 14:22:49 -0700 Subject: [PATCH 2/4] Address review comments - extract failsafe logic into an adapter class from TableRetryPolicy - added a unit test class for TableRetryPolicy + FailsafeAdapter - use retry executor service as a singleton - added a permanent-failure metric --- .../table/remote/RemoteTableProvider.java | 2 +- .../samza/table/retry/FailsafeAdapter.java | 105 ++++++++++ .../table/retry/RetriableReadFunction.java | 24 +-- .../table/retry/RetriableWriteFunction.java | 28 +-- .../samza/table/retry/RetryMetrics.java | 6 + .../samza/table/retry/TableRetryPolicy.java | 182 +++++++++++------- .../retry/TestRetriableTableFunctions.java | 21 +- .../table/retry/TestTableRetryPolicy.java | 73 +++++++ 8 files changed, 322 insertions(+), 119 deletions(-) create mode 100644 samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java create mode 100644 samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java index e476d77b4c..cae0bbdef1 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/RemoteTableProvider.java @@ -94,7 +94,7 @@ public Table getTable() { TableRetryPolicy readRetryPolicy = deserializeObject(READ_RETRY_POLICY); TableRetryPolicy writeRetryPolicy = null; - if (readRetryPolicy != null || writeRetryPolicy != null) { + if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor == null) { retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> { Thread thread = new Thread(runnable); thread.setName("table-retry-executor"); diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java b/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java new file mode 100644 index 0000000000..b5d64a3edb --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; + +import org.apache.samza.SamzaException; + +import net.jodah.failsafe.AsyncFailsafe; +import net.jodah.failsafe.Failsafe; +import net.jodah.failsafe.RetryPolicy; + + +/** + * Helper class adapting the generic {@link TableRetryPolicy} to a failsafe {@link RetryPolicy} and + * creating failsafe retryer instances with proper metrics management. + */ +class FailsafeAdapter { + /** + * Convert the {@link TableRetryPolicy} to failsafe {@link RetryPolicy}. + * @param isRetriable predicate to signal retriable exceptions. + * @return this policy instance + */ + static RetryPolicy valueOf(Predicate isRetriable, TableRetryPolicy policy) { + RetryPolicy failSafePolicy = new RetryPolicy(); + + switch (policy.getBackoffType()) { + case NONE: + break; + + case FIXED: + failSafePolicy.withDelay(policy.getSleepTime().toMillis(), TimeUnit.MILLISECONDS); + break; + + case RANDOM: + failSafePolicy.withDelay(policy.getRandomMin().toMillis(), policy.getRandomMax().toMillis(), TimeUnit.MILLISECONDS); + break; + + case EXPONENTIAL: + failSafePolicy.withBackoff(policy.getSleepTime().toMillis(), policy.getExponentialMaxSleep().toMillis(), TimeUnit.MILLISECONDS, + policy.getExponentialFactor()); + break; + + default: + throw new SamzaException("Unknown retry policy type."); + } + + if (policy.getMaxDuration() != null) { + failSafePolicy.withMaxDuration(policy.getMaxDuration().toMillis(), TimeUnit.MILLISECONDS); + } + if (policy.getMaxAttempts() != null) { + failSafePolicy.withMaxRetries(policy.getMaxAttempts()); + } + if (policy.getJitter() != null && policy.getBackoffType() != TableRetryPolicy.BackoffType.RANDOM) { + failSafePolicy.withJitter(policy.getJitter().toMillis(), TimeUnit.MILLISECONDS); + } + + failSafePolicy.retryOn((e) -> isRetriable.test(e)); + + return failSafePolicy; + } + + /** + * Obtain an async failsafe retryer instance with the specified policy, metrics, and executor service. + * @param retryPolicy retry policy + * @param metrics retry metrics + * @param retryExec executor service for scheduling async retries + * @return {@link net.jodah.failsafe.AsyncFailsafe} instance + */ + static AsyncFailsafe failsafe(RetryPolicy retryPolicy, RetryMetrics metrics, ScheduledExecutorService retryExec) { + long startMs = System.currentTimeMillis(); + return Failsafe.with(retryPolicy).with(retryExec) + .onRetry(e -> metrics.retryCount.inc()) + .onRetriesExceeded(e -> { + metrics.retryTimer.update(System.currentTimeMillis() - startMs); + metrics.permFailureCount.inc(); + }) + .onSuccess((e, ctx) -> { + if (ctx.getExecutions() > 1) { + metrics.retryTimer.update(System.currentTimeMillis() - startMs); + } else { + metrics.successCount.inc(); + } + }); + } +} diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java index 7fae45c7e7..4a13b8620b 100644 --- a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java +++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java @@ -30,10 +30,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import net.jodah.failsafe.AsyncFailsafe; -import net.jodah.failsafe.Failsafe; import net.jodah.failsafe.RetryPolicy; +import static org.apache.samza.table.retry.FailsafeAdapter.failsafe; + /** * Wrapper for a {@link TableReadFunction} instance to add common retry @@ -63,12 +63,12 @@ public RetriableReadFunction(TableRetryPolicy policy, TableReadFunction re this.readFn = readFn; this.retryExecutor = retryExecutor; - this.retryPolicy = policy.toFailsafePolicy(readFn::isRetriable); + this.retryPolicy = FailsafeAdapter.valueOf((ex) -> readFn.isRetriable(ex), policy); } @Override public CompletableFuture getAsync(K key) { - return getFailsafe() + return failsafe(retryPolicy, retryMetrics, retryExecutor) .future(k -> readFn.getAsync(key)) .exceptionally(e -> { throw new SamzaException("Failed to get the record for " + key + " after retries.", e); @@ -77,27 +77,13 @@ public CompletableFuture getAsync(K key) { @Override public CompletableFuture> getAllAsync(Collection keys) { - return getFailsafe() + return failsafe(retryPolicy, retryMetrics, retryExecutor) .future(k -> readFn.getAllAsync(keys)) .exceptionally(e -> { throw new SamzaException("Failed to get the records for " + keys + " after retries.", e); }); } - private AsyncFailsafe getFailsafe() { - long startMs = System.currentTimeMillis(); - return Failsafe.with(retryPolicy).with(retryExecutor) - .onRetry(e -> retryMetrics.retryCount.inc()) - .onRetriesExceeded(e -> retryMetrics.retryTimer.update(System.currentTimeMillis() - startMs)) - .onSuccess((e, ctx) -> { - if (ctx.getExecutions() > 1) { - retryMetrics.retryTimer.update(System.currentTimeMillis() - startMs); - } else { - retryMetrics.successCount.inc(); - } - }); - } - @Override public boolean isRetriable(Throwable exception) { return readFn.isRetriable(exception); diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java index e90f5b4b13..5b72a134d8 100644 --- a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java +++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java @@ -30,10 +30,10 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import net.jodah.failsafe.AsyncFailsafe; -import net.jodah.failsafe.Failsafe; import net.jodah.failsafe.RetryPolicy; +import static org.apache.samza.table.retry.FailsafeAdapter.failsafe; + /** * Wrapper for a {@link TableWriteFunction} instance to add common retry @@ -63,12 +63,12 @@ public RetriableWriteFunction(TableRetryPolicy policy, TableWriteFunction this.writeFn = writeFn; this.retryExecutor = retryExecutor; - this.retryPolicy = policy.toFailsafePolicy(writeFn::isRetriable); + this.retryPolicy = FailsafeAdapter.valueOf((ex) -> writeFn.isRetriable(ex), policy); } @Override public CompletableFuture putAsync(K key, V record) { - return failsafe() + return failsafe(retryPolicy, retryMetrics, retryExecutor) .future(k -> writeFn.putAsync(key, record)) .exceptionally(e -> { throw new SamzaException("Failed to get the record for " + key + " after retries.", e); @@ -77,7 +77,7 @@ public CompletableFuture putAsync(K key, V record) { @Override public CompletableFuture putAllAsync(Collection> records) { - return failsafe() + return failsafe(retryPolicy, retryMetrics, retryExecutor) .future(k -> writeFn.putAllAsync(records)) .exceptionally(e -> { throw new SamzaException("Failed to put records after retries.", e); @@ -86,7 +86,7 @@ public CompletableFuture putAllAsync(Collection> records) { @Override public CompletableFuture deleteAsync(K key) { - return failsafe() + return failsafe(retryPolicy, retryMetrics, retryExecutor) .future(k -> writeFn.deleteAsync(key)) .exceptionally(e -> { throw new SamzaException("Failed to delete the record for " + key + " after retries.", e); @@ -95,7 +95,7 @@ public CompletableFuture deleteAsync(K key) { @Override public CompletableFuture deleteAllAsync(Collection keys) { - return failsafe() + return failsafe(retryPolicy, retryMetrics, retryExecutor) .future(k -> writeFn.deleteAllAsync(keys)) .exceptionally(e -> { throw new SamzaException("Failed to delete the records for " + keys + " after retries.", e); @@ -107,20 +107,6 @@ public boolean isRetriable(Throwable exception) { return writeFn.isRetriable(exception); } - private AsyncFailsafe failsafe() { - long startMs = System.currentTimeMillis(); - return Failsafe.with(retryPolicy).with(retryExecutor) - .onRetry(e -> retryMetrics.retryCount.inc()) - .onRetriesExceeded(e -> retryMetrics.retryTimer.update(System.currentTimeMillis() - startMs)) - .onSuccess((e, ctx) -> { - if (ctx.getExecutions() > 1) { - retryMetrics.retryTimer.update(System.currentTimeMillis() - startMs); - } else { - retryMetrics.successCount.inc(); - } - }); - } - /** * Initialize retry-related metrics. * @param metricsUtil metrics util diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java index 4e0654a792..fbc511c8b6 100644 --- a/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java +++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetryMetrics.java @@ -39,6 +39,11 @@ class RetryMetrics { */ final Counter successCount; + /** + * Number of operations that failed permanently and exhausted all retries + */ + final Counter permFailureCount; + /** * Total time spent in each IO; this is updated only * when at least one retries have been attempted. @@ -48,6 +53,7 @@ class RetryMetrics { public RetryMetrics(String prefix, TableMetricsUtil metricsUtil) { retryCount = metricsUtil.newCounter(prefix + "-retry-count"); successCount = metricsUtil.newCounter(prefix + "-success-count"); + permFailureCount = metricsUtil.newCounter(prefix + "-perm-failure-count"); retryTimer = metricsUtil.newTimer(prefix + "-retry-timer"); } } diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java b/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java index 4267e94a6b..4c522b3412 100644 --- a/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java +++ b/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java @@ -20,12 +20,7 @@ package org.apache.samza.table.retry; import java.io.Serializable; -import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; - -import org.apache.samza.SamzaException; - -import net.jodah.failsafe.RetryPolicy; +import java.time.Duration; /** @@ -36,63 +31,82 @@ * - jitter * * Retry libraries can implement a subset or all features as described by this common policy. - * Currently, the policy object can be translated into {@link RetryPolicy} of failsafe library. */ public class TableRetryPolicy implements Serializable { - enum BackoffType { NONE, FIXED, RANDOM, EXPONENTIAL } + enum BackoffType { + /** + * No backoff in between two retry attempts. + */ + NONE, + + /** + * Backoff by a fixed duration {@code sleepTime}. + */ + FIXED, + + /** + * Backoff by a randomly selected duration between {@code minSleep} and {@code maxSleep}. + */ + RANDOM, + + /** + * Backoff by exponentially increasing durations by {@code exponentialFactor} starting from {@code sleepTime}. + */ + EXPONENTIAL + } // Backoff parameters - private long sleepMs; - private long randomMinMs; - private long randomMaxMs; + private Duration sleepTime; + private Duration randomMin; + private Duration randomMax; private double exponentialFactor; - private long exponentialMaxSleepMs; - private long jitterMs; + private Duration exponentialMaxSleep; + private Duration jitter; // By default no early termination - private int maxAttempts = -1; - private long maxDelayMs = Long.MAX_VALUE; + private Integer maxAttempts = null; + private Duration maxDuration = null; // By default no backoff during retries private BackoffType backoffType = BackoffType.NONE; /** - * Set the sleep time for the fixed backoff policy. - * @param sleepMs sleep time in milliseconds + * Set the sleepTime time for the fixed backoff policy. + * @param sleepTime sleepTime time * @return this policy instance */ - public TableRetryPolicy withFixedBackoff(long sleepMs) { - this.sleepMs = sleepMs; + public TableRetryPolicy withFixedBackoff(Duration sleepTime) { + this.sleepTime = sleepTime; this.backoffType = BackoffType.FIXED; return this; } /** - * Set the sleep time for the random backoff policy. The actual sleep time - * before each attempt is randomly selected between {@code [minSleepMs, maxSleepMs]} - * @param minSleepMs lower bound sleep time in milliseconds - * @param maxSleepMs upper bound sleep time in milliseconds + * Set the sleepTime time for the random backoff policy. The actual sleepTime time + * before each attempt is randomly selected between {@code [minSleep, maxSleep]} + * @param minSleep lower bound sleepTime time + * @param maxSleep upper bound sleepTime time * @return this policy instance */ - public TableRetryPolicy withRandomBackoff(long minSleepMs, long maxSleepMs) { - this.randomMinMs = minSleepMs; - this.randomMaxMs = maxSleepMs; + public TableRetryPolicy withRandomBackoff(Duration minSleep, Duration maxSleep) { + this.randomMin = minSleep; + this.randomMax = maxSleep; this.backoffType = BackoffType.RANDOM; return this; } /** - * Set the parameters for the exponential backoff policy. The actual sleep time - * is exponentially incremented up to the {@code maxSleepMs} and multiplying + * Set the parameters for the exponential backoff policy. The actual sleepTime time + * is exponentially incremented up to the {@code maxSleep} and multiplying * successive delays by the {@code factor}. - * @param sleepMs initial sleep time in milliseconds - * @param maxSleepMs upper bound sleep time in milliseconds + * @param sleepTime initial sleepTime time + * @param maxSleep upper bound sleepTime time * @param factor exponential factor for backoff * @return this policy instance */ - public TableRetryPolicy withExponentialBackoff(long sleepMs, long maxSleepMs, double factor) { - this.sleepMs = sleepMs; - this.exponentialMaxSleepMs = maxSleepMs; + public TableRetryPolicy withExponentialBackoff(Duration sleepTime, Duration maxSleep, double factor) { + this.sleepTime = sleepTime; + this.exponentialMaxSleep = maxSleep; this.exponentialFactor = factor; this.backoffType = BackoffType.EXPONENTIAL; return this; @@ -101,12 +115,15 @@ public TableRetryPolicy withExponentialBackoff(long sleepMs, long maxSleepMs, do /** * Set the jitter for the backoff policy to provide additional randomness. * If this is set, a random value between {@code [0, jitter]} will be added - * to each sleep time. - * @param jitterMs initial sleep time in milliseconds + * to each sleepTime time. This applies to {@code FIXED} and {@code EXPONENTIAL} + * modes only. + * @param jitter initial sleepTime time * @return this policy instance */ - public TableRetryPolicy withJitter(long jitterMs) { - this.jitterMs = jitterMs; + public TableRetryPolicy withJitter(Duration jitter) { + if (backoffType != BackoffType.RANDOM) { + this.jitter = jitter; + } return this; } @@ -121,49 +138,78 @@ public TableRetryPolicy withStopAfterAttempts(int maxAttempts) { } /** - * Set maximum total delay (sleep + execution) before terminating the operation. - * @param maxDelayMs delay time in milliseconds + * Set maximum total delay (sleepTime + execution) before terminating the operation. + * @param maxDelay delay time * @return this policy instance */ - public TableRetryPolicy withStopAfterDelay(long maxDelayMs) { - this.maxDelayMs = maxDelayMs; + public TableRetryPolicy withStopAfterDelay(Duration maxDelay) { + this.maxDuration = maxDelay; return this; } /** - * Convert the TableRetryPolicy to failsafe {@link RetryPolicy}. - * @param isRetriable predicate to signal retriable exceptions. - * @return this policy instance + * @return initial/fixed sleep time. */ - RetryPolicy toFailsafePolicy(Predicate isRetriable) { - RetryPolicy policy = new RetryPolicy(); - policy.withMaxDuration(maxDelayMs, TimeUnit.MILLISECONDS); - policy.withMaxRetries(maxAttempts); - if (jitterMs != 0) { - policy.withJitter(jitterMs, TimeUnit.MILLISECONDS); - } - policy.retryOn((e) -> isRetriable.test(e)); + public Duration getSleepTime() { + return sleepTime; + } - switch (backoffType) { - case NONE: - break; + /** + * @return lower sleepTime time for random backoff or null if {@code policyType} is not {@code RANDOM}. + */ + public Duration getRandomMin() { + return randomMin; + } - case FIXED: - policy.withDelay(sleepMs, TimeUnit.MILLISECONDS); - break; + /** + * @return upper sleepTime time for random backoff or null if {@code policyType} is not {@code RANDOM}. + */ + public Duration getRandomMax() { + return randomMax; + } - case RANDOM: - policy.withDelay(randomMinMs, randomMaxMs, TimeUnit.MILLISECONDS); - break; + /** + * @return exponential factor for exponential backoff. + */ + public double getExponentialFactor() { + return exponentialFactor; + } - case EXPONENTIAL: - policy.withBackoff(sleepMs, exponentialMaxSleepMs, TimeUnit.MILLISECONDS, exponentialFactor); - break; + /** + * @return maximum sleepTime time for exponential backoff or null if {@code policyType} is not {@code EXPONENTIAL}. + */ + public Duration getExponentialMaxSleep() { + return exponentialMaxSleep; + } - default: - throw new SamzaException("Unknown retry policy type."); - } + /** + * Introduce randomness to the sleepTime time. + * @return jitter to add on to each backoff or null if not set. + */ + public Duration getJitter() { + return jitter; + } + + /** + * Termination after a fix number of attempts. + * @return maximum number of attempts without success before giving up the operation. + */ + public Integer getMaxAttempts() { + return maxAttempts; + } + + /** + * Termination after a fixed duration. + * @return maximum duration without success before giving up the operation. + */ + public Duration getMaxDuration() { + return maxDuration; + } - return policy; + /** + * @return type of the backoff. + */ + public BackoffType getBackoffType() { + return backoffType; } } diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java index 8b99d545b3..2c6ca9edfb 100644 --- a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java +++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java @@ -19,6 +19,7 @@ package org.apache.samza.table.retry; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -65,7 +66,7 @@ public TableMetricsUtil getMetricsUtil(String tableId) { public void testFirstTimeSuccessGet() throws Exception { String tableId = "testFirstTimeSuccessGet"; TableRetryPolicy policy = new TableRetryPolicy(); - policy.withFixedBackoff(100); + policy.withFixedBackoff(Duration.ofMillis(100)); TableReadFunction readFn = mock(TableReadFunction.class); doReturn(true).when(readFn).isRetriable(any()); doReturn(CompletableFuture.completedFuture("bar")).when(readFn).getAsync(anyString()); @@ -83,7 +84,7 @@ public void testFirstTimeSuccessGet() throws Exception { public void testRetryEngagedGet() throws Exception { String tableId = "testRetryEngagedGet"; TableRetryPolicy policy = new TableRetryPolicy(); - policy.withFixedBackoff(10); + policy.withFixedBackoff(Duration.ofMillis(10)); TableReadFunction readFn = mock(TableReadFunction.class); doReturn(true).when(readFn).isRetriable(any()); @@ -116,8 +117,8 @@ public void testRetryEngagedGet() throws Exception { public void testRetryExhaustedTimeGet() throws Exception { String tableId = "testRetryExhaustedTime"; TableRetryPolicy policy = new TableRetryPolicy(); - policy.withFixedBackoff(5); - policy.withStopAfterDelay(100); + policy.withFixedBackoff(Duration.ofMillis(5)); + policy.withStopAfterDelay(Duration.ofMillis(100)); TableReadFunction readFn = mock(TableReadFunction.class); doReturn(true).when(readFn).isRetriable(any()); @@ -144,7 +145,7 @@ public void testRetryExhaustedTimeGet() throws Exception { public void testRetryExhaustedAttemptsGet() throws Exception { String tableId = "testRetryExhaustedAttempts"; TableRetryPolicy policy = new TableRetryPolicy(); - policy.withFixedBackoff(5); + policy.withFixedBackoff(Duration.ofMillis(5)); policy.withStopAfterAttempts(10); TableReadFunction readFn = mock(TableReadFunction.class); doReturn(true).when(readFn).isRetriable(any()); @@ -172,7 +173,7 @@ public void testRetryExhaustedAttemptsGet() throws Exception { public void testFirstTimeSuccessPut() throws Exception { String tableId = "testFirstTimeSuccessPut"; TableRetryPolicy policy = new TableRetryPolicy(); - policy.withFixedBackoff(100); + policy.withFixedBackoff(Duration.ofMillis(100)); TableWriteFunction writeFn = mock(TableWriteFunction.class); doReturn(true).when(writeFn).isRetriable(any()); doReturn(CompletableFuture.completedFuture("bar")).when(writeFn).putAsync(anyString(), anyString()); @@ -190,7 +191,7 @@ public void testFirstTimeSuccessPut() throws Exception { public void testRetryEngagedPut() throws Exception { String tableId = "testRetryEngagedPut"; TableRetryPolicy policy = new TableRetryPolicy(); - policy.withFixedBackoff(10); + policy.withFixedBackoff(Duration.ofMillis(10)); TableWriteFunction writeFn = mock(TableWriteFunction.class); doReturn(CompletableFuture.completedFuture(null)).when(writeFn).putAllAsync(any()); doReturn(true).when(writeFn).isRetriable(any()); @@ -224,8 +225,8 @@ public void testRetryEngagedPut() throws Exception { public void testRetryExhaustedTimePut() throws Exception { String tableId = "testRetryExhaustedTimePut"; TableRetryPolicy policy = new TableRetryPolicy(); - policy.withFixedBackoff(5); - policy.withStopAfterDelay(100); + policy.withFixedBackoff(Duration.ofMillis(5)); + policy.withStopAfterDelay(Duration.ofMillis(100)); TableWriteFunction writeFn = mock(TableWriteFunction.class); doReturn(true).when(writeFn).isRetriable(any()); @@ -252,7 +253,7 @@ public void testRetryExhaustedTimePut() throws Exception { public void testRetryExhaustedAttemptsPut() throws Exception { String tableId = "testRetryExhaustedAttemptsPut"; TableRetryPolicy policy = new TableRetryPolicy(); - policy.withFixedBackoff(5); + policy.withFixedBackoff(Duration.ofMillis(5)); policy.withStopAfterAttempts(10); TableWriteFunction writeFn = mock(TableWriteFunction.class); doReturn(true).when(writeFn).isRetriable(any()); diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java new file mode 100644 index 0000000000..2019294b32 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.table.retry; + +import java.time.Duration; + +import org.junit.Assert; +import org.junit.Test; + +import net.jodah.failsafe.RetryPolicy; + + +public class TestTableRetryPolicy { + @Test + public void testNoRetry() { + TableRetryPolicy retryPolicy = new TableRetryPolicy(); + Assert.assertEquals(TableRetryPolicy.BackoffType.NONE, retryPolicy.getBackoffType()); + } + + @Test + public void testFixedRetry() { + TableRetryPolicy retryPolicy = new TableRetryPolicy(); + retryPolicy.withFixedBackoff(Duration.ofMillis(1000)); + retryPolicy.withJitter(Duration.ofMillis(100)); + retryPolicy.withStopAfterAttempts(4); + Assert.assertEquals(TableRetryPolicy.BackoffType.FIXED, retryPolicy.getBackoffType()); + RetryPolicy fsRetry = FailsafeAdapter.valueOf((e) -> true, retryPolicy); + Assert.assertEquals(1000, fsRetry.getDelay().toMillis()); + Assert.assertEquals(100, fsRetry.getJitter().toMillis()); + Assert.assertEquals(4, fsRetry.getMaxRetries()); + } + + @Test + public void testRandomRetry() { + TableRetryPolicy retryPolicy = new TableRetryPolicy(); + retryPolicy.withRandomBackoff(Duration.ofMillis(1000), Duration.ofMillis(2000)); + retryPolicy.withJitter(Duration.ofMillis(100)); // no-op + Assert.assertEquals(TableRetryPolicy.BackoffType.RANDOM, retryPolicy.getBackoffType()); + RetryPolicy fsRetry = FailsafeAdapter.valueOf((e) -> true, retryPolicy); + Assert.assertEquals(1000, fsRetry.getDelayMin().toMillis()); + Assert.assertEquals(2000, fsRetry.getDelayMax().toMillis()); + } + + @Test + public void testExponentialRetry() { + TableRetryPolicy retryPolicy = new TableRetryPolicy(); + retryPolicy.withExponentialBackoff(Duration.ofMillis(1000), Duration.ofMillis(2000), 1.5); + retryPolicy.withJitter(Duration.ofMillis(100)); + Assert.assertEquals(TableRetryPolicy.BackoffType.EXPONENTIAL, retryPolicy.getBackoffType()); + RetryPolicy fsRetry = FailsafeAdapter.valueOf((e) -> true, retryPolicy); + Assert.assertEquals(1000, fsRetry.getDelay().toMillis()); + Assert.assertEquals(2000, fsRetry.getMaxDelay().toMillis()); + Assert.assertEquals(1.5, fsRetry.getDelayFactor(), 0.001); + Assert.assertEquals(100, fsRetry.getJitter().toMillis()); + } +} From 60b88026a11e290687103578f86206bdd04bcdb9 Mon Sep 17 00:00:00 2001 From: Peng Du Date: Tue, 11 Sep 2018 14:55:33 -0700 Subject: [PATCH 3/4] Add option for application to provide isRetriable predicate This allows the application to have a say on which exception types can be retried. Exception will be retried if either table function or the custom predicate say so. --- .../samza/table/remote/TableReadFunction.java | 6 +-- .../table/remote/TableWriteFunction.java | 6 +-- .../samza/table/retry/FailsafeAdapter.java | 6 +-- .../table/retry/RetriableReadFunction.java | 3 +- .../table/retry/RetriableWriteFunction.java | 3 +- .../samza/table/retry/TableRetryPolicy.java | 46 ++++++++++++++++++- .../remote/TestRemoteTableDescriptor.java | 4 +- .../retry/TestRetriableTableFunctions.java | 37 +++++++++++++++ .../table/retry/TestTableRetryPolicy.java | 15 ++++-- 9 files changed, 104 insertions(+), 22 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java index 8b33a943b7..4791779415 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/TableReadFunction.java @@ -102,14 +102,10 @@ default CompletableFuture> getAllAsync(Collection keys) { /** * Determine whether the current operation can be retried with the last thrown exception. - * The default implementation disables retries for any type of exceptions. This can be - * overridden to customize the policy based on the data store protocol. * @param exception exception thrown by a table operation * @return whether the operation can be retried */ - default boolean isRetriable(Throwable exception) { - return false; - } + boolean isRetriable(Throwable exception); // optionally implement readObject() to initialize transient states } diff --git a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java index b1676241a0..d9d619f888 100644 --- a/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java +++ b/samza-core/src/main/java/org/apache/samza/table/remote/TableWriteFunction.java @@ -144,14 +144,10 @@ default CompletableFuture deleteAllAsync(Collection keys) { /** * Determine whether the current operation can be retried with the last thrown exception. - * The default implementation disables retries for any type of exceptions. This can be - * overridden to customize the policy based on the data store protocol. * @param exception exception thrown by a table operation * @return whether the operation can be retried */ - default boolean isRetriable(Throwable exception) { - return false; - } + boolean isRetriable(Throwable exception); /** * Flush the remote store (optional) diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java b/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java index b5d64a3edb..5dcebc8a61 100644 --- a/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java +++ b/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java @@ -21,7 +21,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; import org.apache.samza.SamzaException; @@ -37,10 +36,9 @@ class FailsafeAdapter { /** * Convert the {@link TableRetryPolicy} to failsafe {@link RetryPolicy}. - * @param isRetriable predicate to signal retriable exceptions. * @return this policy instance */ - static RetryPolicy valueOf(Predicate isRetriable, TableRetryPolicy policy) { + static RetryPolicy valueOf(TableRetryPolicy policy) { RetryPolicy failSafePolicy = new RetryPolicy(); switch (policy.getBackoffType()) { @@ -74,7 +72,7 @@ static RetryPolicy valueOf(Predicate isRetriable, TableRetryPolicy po failSafePolicy.withJitter(policy.getJitter().toMillis(), TimeUnit.MILLISECONDS); } - failSafePolicy.retryOn((e) -> isRetriable.test(e)); + failSafePolicy.retryOn(e -> policy.getRetryOn().test(e)); return failSafePolicy; } diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java index 4a13b8620b..31b788820e 100644 --- a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java +++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java @@ -63,7 +63,8 @@ public RetriableReadFunction(TableRetryPolicy policy, TableReadFunction re this.readFn = readFn; this.retryExecutor = retryExecutor; - this.retryPolicy = FailsafeAdapter.valueOf((ex) -> readFn.isRetriable(ex), policy); + policy.withRetryOn((ex) -> readFn.isRetriable(ex) || policy.getRetryOn().test(ex)); + this.retryPolicy = FailsafeAdapter.valueOf(policy); } @Override diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java index 5b72a134d8..f80458e5f2 100644 --- a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java +++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java @@ -63,7 +63,8 @@ public RetriableWriteFunction(TableRetryPolicy policy, TableWriteFunction this.writeFn = writeFn; this.retryExecutor = retryExecutor; - this.retryPolicy = FailsafeAdapter.valueOf((ex) -> writeFn.isRetriable(ex), policy); + policy.withRetryOn((ex) -> writeFn.isRetriable(ex) || policy.getRetryOn().test(ex)); + this.retryPolicy = FailsafeAdapter.valueOf(policy); } @Override diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java b/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java index 4c522b3412..1bee948178 100644 --- a/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java +++ b/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java @@ -21,6 +21,9 @@ import java.io.Serializable; import java.time.Duration; +import java.util.function.Predicate; + +import com.google.common.base.Preconditions; /** @@ -70,12 +73,24 @@ enum BackoffType { // By default no backoff during retries private BackoffType backoffType = BackoffType.NONE; + /** + * Serializable adapter interface for {@link java.util.function.Predicate}. + * This is needed because TableRetryPolicy needs to be serializable as part of the + * table config whereas {@link java.util.function.Predicate} is not serializable. + */ + public interface RetryPredicate extends Predicate, Serializable { + } + + // By default no custom retry predicate so retry decision is made solely by the table functions + private RetryPredicate isRetriable = (ex) -> false; + /** * Set the sleepTime time for the fixed backoff policy. * @param sleepTime sleepTime time * @return this policy instance */ public TableRetryPolicy withFixedBackoff(Duration sleepTime) { + Preconditions.checkNotNull(sleepTime); this.sleepTime = sleepTime; this.backoffType = BackoffType.FIXED; return this; @@ -89,6 +104,8 @@ public TableRetryPolicy withFixedBackoff(Duration sleepTime) { * @return this policy instance */ public TableRetryPolicy withRandomBackoff(Duration minSleep, Duration maxSleep) { + Preconditions.checkNotNull(minSleep); + Preconditions.checkNotNull(maxSleep); this.randomMin = minSleep; this.randomMax = maxSleep; this.backoffType = BackoffType.RANDOM; @@ -105,6 +122,8 @@ public TableRetryPolicy withRandomBackoff(Duration minSleep, Duration maxSleep) * @return this policy instance */ public TableRetryPolicy withExponentialBackoff(Duration sleepTime, Duration maxSleep, double factor) { + Preconditions.checkNotNull(sleepTime); + Preconditions.checkNotNull(maxSleep); this.sleepTime = sleepTime; this.exponentialMaxSleep = maxSleep; this.exponentialFactor = factor; @@ -121,6 +140,7 @@ public TableRetryPolicy withExponentialBackoff(Duration sleepTime, Duration maxS * @return this policy instance */ public TableRetryPolicy withJitter(Duration jitter) { + Preconditions.checkNotNull(jitter); if (backoffType != BackoffType.RANDOM) { this.jitter = jitter; } @@ -133,6 +153,7 @@ public TableRetryPolicy withJitter(Duration jitter) { * @return this policy instance */ public TableRetryPolicy withStopAfterAttempts(int maxAttempts) { + Preconditions.checkArgument(maxAttempts >= 0); this.maxAttempts = maxAttempts; return this; } @@ -143,10 +164,24 @@ public TableRetryPolicy withStopAfterAttempts(int maxAttempts) { * @return this policy instance */ public TableRetryPolicy withStopAfterDelay(Duration maxDelay) { + Preconditions.checkNotNull(maxDelay); this.maxDuration = maxDelay; return this; } + /** + * Set the predicate to use for identifying retriable exceptions. If specified, table + * retry logic will consult both such predicate and table function and retry will be + * attempted if either option returns true. + * @param isRetriable predicate for retriable exception identification + * @return this policy instance + */ + public TableRetryPolicy withRetryOn(RetryPredicate isRetriable) { + Preconditions.checkNotNull(isRetriable); + this.isRetriable = isRetriable; + return this; + } + /** * @return initial/fixed sleep time. */ @@ -192,7 +227,7 @@ public Duration getJitter() { /** * Termination after a fix number of attempts. - * @return maximum number of attempts without success before giving up the operation. + * @return maximum number of attempts without success before giving up the operation or null if not set. */ public Integer getMaxAttempts() { return maxAttempts; @@ -200,7 +235,7 @@ public Integer getMaxAttempts() { /** * Termination after a fixed duration. - * @return maximum duration without success before giving up the operation. + * @return maximum duration without success before giving up the operation or null if not set. */ public Duration getMaxDuration() { return maxDuration; @@ -212,4 +247,11 @@ public Duration getMaxDuration() { public BackoffType getBackoffType() { return backoffType; } + + /** + * @return Custom predicate for retriable exception identification or null if not specified. + */ + public RetryPredicate getRetryOn() { + return isRetriable; + } } diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java index 6093173c6b..a28e4ab45a 100644 --- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java +++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java @@ -141,7 +141,9 @@ public int getCredits(K key, V value) { private void doTestDeserializeReadFunctionAndLimiter(boolean rateOnly, boolean rlGets, boolean rlPuts) { int numRateLimitOps = (rlGets ? 1 : 0) + (rlPuts ? 1 : 0); RemoteTableDescriptor desc = new RemoteTableDescriptor("1"); - desc.withReadFunction(mock(TableReadFunction.class), new TableRetryPolicy()); + TableRetryPolicy retryPolicy = new TableRetryPolicy(); + retryPolicy.withRetryOn((ex) -> false); + desc.withReadFunction(mock(TableReadFunction.class), retryPolicy); desc.withWriteFunction(mock(TableWriteFunction.class)); desc.withAsyncCallbackExecutorPoolSize(10); diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java index 2c6ca9edfb..ac608d9c18 100644 --- a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java +++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java @@ -276,4 +276,41 @@ public void testRetryExhaustedAttemptsPut() throws Exception { Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); } + + @Test + public void testMixedIsRetriablePredicates() throws Exception { + String tableId = "testMixedIsRetriablePredicates"; + TableRetryPolicy policy = new TableRetryPolicy(); + policy.withFixedBackoff(Duration.ofMillis(100)); + + // Retry should be attempted based on the custom classification, ie. retry on NPE + policy.withRetryOn(ex -> ex instanceof NullPointerException); + + TableReadFunction readFn = mock(TableReadFunction.class); + + // Table fn classification only retries on IllegalArgumentException + doAnswer(arg -> arg.getArgumentAt(0, Throwable.class) instanceof IllegalArgumentException).when(readFn).isRetriable(any()); + + int [] times = new int[1]; + doAnswer(arg -> { + if (times[0]++ == 0) { + CompletableFuture future = new CompletableFuture(); + future.completeExceptionally(new NullPointerException("test exception")); + return future; + } else { + return CompletableFuture.completedFuture("bar"); + } + }).when(readFn).getAsync(any()); + + RetriableReadFunction retryIO = new RetriableReadFunction<>(policy, readFn, schedExec); + retryIO.setMetrics(getMetricsUtil(tableId)); + + Assert.assertEquals("bar", retryIO.getAsync("foo").get()); + + verify(readFn, times(2)).getAsync(anyString()); + Assert.assertEquals(1, retryIO.retryMetrics.retryCount.getCount()); + Assert.assertEquals(0, retryIO.retryMetrics.successCount.getCount()); + Assert.assertTrue(retryIO.retryMetrics.retryTimer.getSnapshot().getMax() > 0); + } + } diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java index 2019294b32..a67afafa43 100644 --- a/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java +++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java @@ -41,10 +41,11 @@ public void testFixedRetry() { retryPolicy.withJitter(Duration.ofMillis(100)); retryPolicy.withStopAfterAttempts(4); Assert.assertEquals(TableRetryPolicy.BackoffType.FIXED, retryPolicy.getBackoffType()); - RetryPolicy fsRetry = FailsafeAdapter.valueOf((e) -> true, retryPolicy); + RetryPolicy fsRetry = FailsafeAdapter.valueOf(retryPolicy); Assert.assertEquals(1000, fsRetry.getDelay().toMillis()); Assert.assertEquals(100, fsRetry.getJitter().toMillis()); Assert.assertEquals(4, fsRetry.getMaxRetries()); + Assert.assertNotNull(retryPolicy.getRetryOn()); } @Test @@ -53,7 +54,7 @@ public void testRandomRetry() { retryPolicy.withRandomBackoff(Duration.ofMillis(1000), Duration.ofMillis(2000)); retryPolicy.withJitter(Duration.ofMillis(100)); // no-op Assert.assertEquals(TableRetryPolicy.BackoffType.RANDOM, retryPolicy.getBackoffType()); - RetryPolicy fsRetry = FailsafeAdapter.valueOf((e) -> true, retryPolicy); + RetryPolicy fsRetry = FailsafeAdapter.valueOf(retryPolicy); Assert.assertEquals(1000, fsRetry.getDelayMin().toMillis()); Assert.assertEquals(2000, fsRetry.getDelayMax().toMillis()); } @@ -64,10 +65,18 @@ public void testExponentialRetry() { retryPolicy.withExponentialBackoff(Duration.ofMillis(1000), Duration.ofMillis(2000), 1.5); retryPolicy.withJitter(Duration.ofMillis(100)); Assert.assertEquals(TableRetryPolicy.BackoffType.EXPONENTIAL, retryPolicy.getBackoffType()); - RetryPolicy fsRetry = FailsafeAdapter.valueOf((e) -> true, retryPolicy); + RetryPolicy fsRetry = FailsafeAdapter.valueOf(retryPolicy); Assert.assertEquals(1000, fsRetry.getDelay().toMillis()); Assert.assertEquals(2000, fsRetry.getMaxDelay().toMillis()); Assert.assertEquals(1.5, fsRetry.getDelayFactor(), 0.001); Assert.assertEquals(100, fsRetry.getJitter().toMillis()); } + + @Test + public void testCustomRetryPredicate() { + TableRetryPolicy retryPolicy = new TableRetryPolicy(); + retryPolicy.withRetryOn((e) -> e instanceof IllegalArgumentException); + Assert.assertTrue(retryPolicy.getRetryOn().test(new IllegalArgumentException())); + Assert.assertFalse(retryPolicy.getRetryOn().test(new NullPointerException())); + } } From 5a44f8793875bb194e8e5861f39b0a3c97c790c9 Mon Sep 17 00:00:00 2001 From: Peng Du Date: Fri, 14 Sep 2018 10:10:22 -0700 Subject: [PATCH 4/4] Address Xinyu's comment --- .../samza/table/retry/FailsafeAdapter.java | 2 +- .../table/retry/RetriableReadFunction.java | 8 +++--- .../table/retry/RetriableWriteFunction.java | 12 +++++---- .../samza/table/retry/TableRetryPolicy.java | 14 +++++----- .../remote/TestRemoteTableDescriptor.java | 2 +- .../retry/TestRetriableTableFunctions.java | 2 +- .../table/retry/TestTableRetryPolicy.java | 8 +++--- .../samza/test/table/TestRemoteTable.java | 27 ++++++++++++++++++- .../table/TestTableDescriptorsProvider.java | 18 +++++++++++-- 9 files changed, 68 insertions(+), 25 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java b/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java index 5dcebc8a61..b2eccd884a 100644 --- a/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java +++ b/samza-core/src/main/java/org/apache/samza/table/retry/FailsafeAdapter.java @@ -72,7 +72,7 @@ static RetryPolicy valueOf(TableRetryPolicy policy) { failSafePolicy.withJitter(policy.getJitter().toMillis(), TimeUnit.MILLISECONDS); } - failSafePolicy.retryOn(e -> policy.getRetryOn().test(e)); + failSafePolicy.retryOn(e -> policy.getRetryPredicate().test(e)); return failSafePolicy; } diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java index 31b788820e..1adddc0f49 100644 --- a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java +++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Predicate; import org.apache.samza.SamzaException; import org.apache.samza.table.remote.TableReadFunction; @@ -63,14 +64,15 @@ public RetriableReadFunction(TableRetryPolicy policy, TableReadFunction re this.readFn = readFn; this.retryExecutor = retryExecutor; - policy.withRetryOn((ex) -> readFn.isRetriable(ex) || policy.getRetryOn().test(ex)); + Predicate retryPredicate = policy.getRetryPredicate(); + policy.withRetryPredicate((ex) -> readFn.isRetriable(ex) || retryPredicate.test(ex)); this.retryPolicy = FailsafeAdapter.valueOf(policy); } @Override public CompletableFuture getAsync(K key) { return failsafe(retryPolicy, retryMetrics, retryExecutor) - .future(k -> readFn.getAsync(key)) + .future(() -> readFn.getAsync(key)) .exceptionally(e -> { throw new SamzaException("Failed to get the record for " + key + " after retries.", e); }); @@ -79,7 +81,7 @@ public CompletableFuture getAsync(K key) { @Override public CompletableFuture> getAllAsync(Collection keys) { return failsafe(retryPolicy, retryMetrics, retryExecutor) - .future(k -> readFn.getAllAsync(keys)) + .future(() -> readFn.getAllAsync(keys)) .exceptionally(e -> { throw new SamzaException("Failed to get the records for " + keys + " after retries.", e); }); diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java index f80458e5f2..2f3f062ff9 100644 --- a/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java +++ b/samza-core/src/main/java/org/apache/samza/table/retry/RetriableWriteFunction.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Predicate; import org.apache.samza.SamzaException; import org.apache.samza.storage.kv.Entry; @@ -63,14 +64,15 @@ public RetriableWriteFunction(TableRetryPolicy policy, TableWriteFunction this.writeFn = writeFn; this.retryExecutor = retryExecutor; - policy.withRetryOn((ex) -> writeFn.isRetriable(ex) || policy.getRetryOn().test(ex)); + Predicate retryPredicate = policy.getRetryPredicate(); + policy.withRetryPredicate((ex) -> writeFn.isRetriable(ex) || retryPredicate.test(ex)); this.retryPolicy = FailsafeAdapter.valueOf(policy); } @Override public CompletableFuture putAsync(K key, V record) { return failsafe(retryPolicy, retryMetrics, retryExecutor) - .future(k -> writeFn.putAsync(key, record)) + .future(() -> writeFn.putAsync(key, record)) .exceptionally(e -> { throw new SamzaException("Failed to get the record for " + key + " after retries.", e); }); @@ -79,7 +81,7 @@ public CompletableFuture putAsync(K key, V record) { @Override public CompletableFuture putAllAsync(Collection> records) { return failsafe(retryPolicy, retryMetrics, retryExecutor) - .future(k -> writeFn.putAllAsync(records)) + .future(() -> writeFn.putAllAsync(records)) .exceptionally(e -> { throw new SamzaException("Failed to put records after retries.", e); }); @@ -88,7 +90,7 @@ public CompletableFuture putAllAsync(Collection> records) { @Override public CompletableFuture deleteAsync(K key) { return failsafe(retryPolicy, retryMetrics, retryExecutor) - .future(k -> writeFn.deleteAsync(key)) + .future(() -> writeFn.deleteAsync(key)) .exceptionally(e -> { throw new SamzaException("Failed to delete the record for " + key + " after retries.", e); }); @@ -97,7 +99,7 @@ public CompletableFuture deleteAsync(K key) { @Override public CompletableFuture deleteAllAsync(Collection keys) { return failsafe(retryPolicy, retryMetrics, retryExecutor) - .future(k -> writeFn.deleteAllAsync(keys)) + .future(() -> writeFn.deleteAllAsync(keys)) .exceptionally(e -> { throw new SamzaException("Failed to delete the records for " + keys + " after retries.", e); }); diff --git a/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java b/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java index 1bee948178..162eb07ed6 100644 --- a/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java +++ b/samza-core/src/main/java/org/apache/samza/table/retry/TableRetryPolicy.java @@ -82,7 +82,7 @@ public interface RetryPredicate extends Predicate, Serializable { } // By default no custom retry predicate so retry decision is made solely by the table functions - private RetryPredicate isRetriable = (ex) -> false; + private RetryPredicate retryPredicate = (ex) -> false; /** * Set the sleepTime time for the fixed backoff policy. @@ -173,12 +173,12 @@ public TableRetryPolicy withStopAfterDelay(Duration maxDelay) { * Set the predicate to use for identifying retriable exceptions. If specified, table * retry logic will consult both such predicate and table function and retry will be * attempted if either option returns true. - * @param isRetriable predicate for retriable exception identification + * @param retryPredicate predicate for retriable exception identification * @return this policy instance */ - public TableRetryPolicy withRetryOn(RetryPredicate isRetriable) { - Preconditions.checkNotNull(isRetriable); - this.isRetriable = isRetriable; + public TableRetryPolicy withRetryPredicate(RetryPredicate retryPredicate) { + Preconditions.checkNotNull(retryPredicate); + this.retryPredicate = retryPredicate; return this; } @@ -251,7 +251,7 @@ public BackoffType getBackoffType() { /** * @return Custom predicate for retriable exception identification or null if not specified. */ - public RetryPredicate getRetryOn() { - return isRetriable; + public RetryPredicate getRetryPredicate() { + return retryPredicate; } } diff --git a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java index a28e4ab45a..efe1acf229 100644 --- a/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java +++ b/samza-core/src/test/java/org/apache/samza/table/remote/TestRemoteTableDescriptor.java @@ -142,7 +142,7 @@ private void doTestDeserializeReadFunctionAndLimiter(boolean rateOnly, boolean r int numRateLimitOps = (rlGets ? 1 : 0) + (rlPuts ? 1 : 0); RemoteTableDescriptor desc = new RemoteTableDescriptor("1"); TableRetryPolicy retryPolicy = new TableRetryPolicy(); - retryPolicy.withRetryOn((ex) -> false); + retryPolicy.withRetryPredicate((ex) -> false); desc.withReadFunction(mock(TableReadFunction.class), retryPolicy); desc.withWriteFunction(mock(TableWriteFunction.class)); desc.withAsyncCallbackExecutorPoolSize(10); diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java index ac608d9c18..9dd5a746d1 100644 --- a/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java +++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestRetriableTableFunctions.java @@ -284,7 +284,7 @@ public void testMixedIsRetriablePredicates() throws Exception { policy.withFixedBackoff(Duration.ofMillis(100)); // Retry should be attempted based on the custom classification, ie. retry on NPE - policy.withRetryOn(ex -> ex instanceof NullPointerException); + policy.withRetryPredicate(ex -> ex instanceof NullPointerException); TableReadFunction readFn = mock(TableReadFunction.class); diff --git a/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java b/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java index a67afafa43..c343d63936 100644 --- a/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java +++ b/samza-core/src/test/java/org/apache/samza/table/retry/TestTableRetryPolicy.java @@ -45,7 +45,7 @@ public void testFixedRetry() { Assert.assertEquals(1000, fsRetry.getDelay().toMillis()); Assert.assertEquals(100, fsRetry.getJitter().toMillis()); Assert.assertEquals(4, fsRetry.getMaxRetries()); - Assert.assertNotNull(retryPolicy.getRetryOn()); + Assert.assertNotNull(retryPolicy.getRetryPredicate()); } @Test @@ -75,8 +75,8 @@ public void testExponentialRetry() { @Test public void testCustomRetryPredicate() { TableRetryPolicy retryPolicy = new TableRetryPolicy(); - retryPolicy.withRetryOn((e) -> e instanceof IllegalArgumentException); - Assert.assertTrue(retryPolicy.getRetryOn().test(new IllegalArgumentException())); - Assert.assertFalse(retryPolicy.getRetryOn().test(new NullPointerException())); + retryPolicy.withRetryPredicate((e) -> e instanceof IllegalArgumentException); + Assert.assertTrue(retryPolicy.getRetryPredicate().test(new IllegalArgumentException())); + Assert.assertFalse(retryPolicy.getRetryPredicate().test(new NullPointerException())); } } diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java index e6e73bb4d9..6f62782dd1 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestRemoteTable.java @@ -93,6 +93,11 @@ public CompletableFuture getAsync(Integer key) { return CompletableFuture.completedFuture(profileMap.get(key)); } + @Override + public boolean isRetriable(Throwable exception) { + return false; + } + static InMemoryReadFunction getInMemoryReadFunction(String serializedProfiles) { return new InMemoryReadFunction(serializedProfiles); } @@ -125,6 +130,11 @@ public CompletableFuture deleteAsync(Integer key) { records.remove(key); return CompletableFuture.completedFuture(null); } + + @Override + public boolean isRetriable(Throwable exception) { + return false; + } } private Table> getCachingTable(Table> actualTable, boolean defaultCache, String id, StreamGraph streamGraph) { @@ -143,6 +153,18 @@ private Table> getCachingTable(Table> actualTable, bool return streamGraph.getTable(cachingDesc); } + static class MyReadFunction implements TableReadFunction { + @Override + public CompletableFuture getAsync(Object key) { + return null; + } + + @Override + public boolean isRetriable(Throwable exception) { + return false; + } + } + private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean defaultCache, String testName) throws Exception { final InMemoryWriteFunction writer = new InMemoryWriteFunction(testName); @@ -168,9 +190,12 @@ private void doTestStreamTableJoinRemoteTable(boolean withCache, boolean default .withReadFunction(InMemoryReadFunction.getInMemoryReadFunction(profiles)) .withRateLimiter(readRateLimiter, null, null); + // dummy reader + TableReadFunction readFn = new MyReadFunction(); + RemoteTableDescriptor outputTableDesc = new RemoteTableDescriptor<>("enriched-page-view-table-1"); outputTableDesc - .withReadFunction(key -> null) // dummy reader + .withReadFunction(readFn) .withWriteFunction(writer) .withRateLimiter(writeRateLimiter, null, null); diff --git a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java index 41b6509fc9..34ffbd49fd 100644 --- a/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java +++ b/samza-test/src/test/java/org/apache/samza/test/table/TestTableDescriptorsProvider.java @@ -24,6 +24,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; + import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.config.ConfigRewriter; @@ -116,15 +118,27 @@ public void testWithTableDescriptorsProviderClass() throws Exception { public static class MySampleNonTableDescriptorsProvider { } + static class MyReadFunction implements TableReadFunction { + @Override + public CompletableFuture getAsync(Object key) { + return null; + } + + @Override + public boolean isRetriable(Throwable exception) { + return false; + } + } + public static class MySampleTableDescriptorsProvider implements TableDescriptorsProvider { @Override public List getTableDescriptors(Config config) { List tableDescriptors = new ArrayList<>(); final RateLimiter readRateLimiter = mock(RateLimiter.class); - final TableReadFunction readRemoteTable = (TableReadFunction) key -> null; + final MyReadFunction readFn = new MyReadFunction(); tableDescriptors.add(new RemoteTableDescriptor<>("remote-table-1") - .withReadFunction(readRemoteTable) + .withReadFunction(readFn) .withRateLimiter(readRateLimiter, null, null) .withSerde(KVSerde.of(new StringSerde(), new LongSerde()))); tableDescriptors.add(new RocksDbTableDescriptor("local-table-1")