Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ project(":samza-core_$scalaVersion") {
compile "org.eclipse.jetty:jetty-webapp:$jettyVersion"
compile "org.scala-lang:scala-library:$scalaLibVersion"
compile "org.slf4j:slf4j-api:$slf4jVersion"
compile "net.jodah:failsafe:$failsafeVersion"
testCompile project(":samza-api").sourceSets.test.output
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-core:$mockitoVersion"
Expand Down
1 change: 1 addition & 0 deletions gradle/dependency-versions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@
yarnVersion = "2.6.1"
zkClientVersion = "0.8"
zookeeperVersion = "3.4.6"
failsafeVersion = "1.1.0"
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@
* @param <V> the type of the value in this table
*/
public class RemoteReadWriteTable<K, V> extends RemoteReadableTable<K, V> implements ReadWriteTable<K, V> {
private final TableWriteFunction<K, V> writeFn;

private DefaultTableWriteMetrics writeMetrics;

@VisibleForTesting
final TableWriteFunction<K, V> writeFn;
final TableRateLimiter writeRateLimiter;

public RemoteReadWriteTable(String tableId, TableReadFunction readFn, TableWriteFunction writeFn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ public class RemoteReadableTable<K, V> implements ReadableTable<K, V> {
protected final ExecutorService callbackExecutor;
protected final ExecutorService tableExecutor;

private final TableReadFunction<K, V> readFn;
private DefaultTableReadMetrics readMetrics;

@VisibleForTesting
final TableReadFunction<K, V> readFn;
final TableRateLimiter<K, V> readRateLimiter;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.apache.samza.operators.BaseTableDescriptor;
import org.apache.samza.table.TableSpec;
import org.apache.samza.table.retry.TableRetryPolicy;
import org.apache.samza.table.utils.SerdeUtils;
import org.apache.samza.util.EmbeddedTaggedRateLimiter;
import org.apache.samza.util.RateLimiter;
Expand Down Expand Up @@ -70,6 +71,9 @@ public class RemoteTableDescriptor<K, V> extends BaseTableDescriptor<K, V, Remot
private TableRateLimiter.CreditFunction<K, V> readCreditFn;
private TableRateLimiter.CreditFunction<K, V> writeCreditFn;

private TableRetryPolicy readRetryPolicy;
private TableRetryPolicy writeRetryPolicy;

// By default execute future callbacks on the native client threads
// ie. no additional thread pool for callbacks.
private int asyncCallbackPoolSize = -1;
Expand Down Expand Up @@ -115,13 +119,23 @@ public TableSpec getTableSpec() {
"write credit function", writeCreditFn));
}

if (readRetryPolicy != null) {
tableSpecConfig.put(RemoteTableProvider.READ_RETRY_POLICY, SerdeUtils.serialize(
"read retry policy", readRetryPolicy));
}

if (writeRetryPolicy != null) {
tableSpecConfig.put(RemoteTableProvider.WRITE_RETRY_POLICY, SerdeUtils.serialize(
"write retry policy", writeRetryPolicy));
}

tableSpecConfig.put(RemoteTableProvider.ASYNC_CALLBACK_POOL_SIZE, String.valueOf(asyncCallbackPoolSize));

return new TableSpec(tableId, serde, RemoteTableProviderFactory.class.getName(), tableSpecConfig);
}

/**
* Use specified TableReadFunction with remote table.
* Use specified TableReadFunction with remote table and a retry policy.
* @param readFn read function instance
* @return this table descriptor instance
*/
Expand All @@ -132,7 +146,7 @@ public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> read
}

/**
* Use specified TableWriteFunction with remote table.
* Use specified TableWriteFunction with remote table and a retry policy.
* @param writeFn write function instance
* @return this table descriptor instance
*/
Expand All @@ -142,6 +156,34 @@ public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> wr
return this;
}

/**
* Use specified TableReadFunction with remote table.
* @param readFn read function instance
* @param retryPolicy retry policy for the read function
* @return this table descriptor instance
*/
public RemoteTableDescriptor<K, V> withReadFunction(TableReadFunction<K, V> readFn, TableRetryPolicy retryPolicy) {
Preconditions.checkNotNull(readFn, "null read function");
Preconditions.checkNotNull(retryPolicy, "null retry policy");
this.readFn = readFn;
this.readRetryPolicy = retryPolicy;
return this;
}

/**
* Use specified TableWriteFunction with remote table.
* @param writeFn write function instance
* @param retryPolicy retry policy for the write function
* @return this table descriptor instance
*/
public RemoteTableDescriptor<K, V> withWriteFunction(TableWriteFunction<K, V> writeFn, TableRetryPolicy retryPolicy) {
Preconditions.checkNotNull(writeFn, "null write function");
Preconditions.checkNotNull(retryPolicy, "null retry policy");
this.writeFn = writeFn;
this.writeRetryPolicy = retryPolicy;
return this;
}

/**
* Specify a rate limiter along with credit functions to map a table record (as KV) to the amount
* of credits to be charged from the rate limiter for table read and write operations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.samza.table.Table;
import org.apache.samza.table.TableSpec;
import org.apache.samza.table.retry.RetriableReadFunction;
import org.apache.samza.table.retry.RetriableWriteFunction;
import org.apache.samza.table.retry.TableRetryPolicy;
import org.apache.samza.table.utils.BaseTableProvider;
import org.apache.samza.table.utils.SerdeUtils;
import org.apache.samza.table.utils.TableMetricsUtil;
import org.apache.samza.util.RateLimiter;

import static org.apache.samza.table.remote.RemoteTableDescriptor.RL_READ_TAG;
Expand All @@ -47,6 +52,8 @@ public class RemoteTableProvider extends BaseTableProvider {
static final String READ_CREDIT_FN = "io.read.credit.func";
static final String WRITE_CREDIT_FN = "io.write.credit.func";
static final String ASYNC_CALLBACK_POOL_SIZE = "io.async.callback.pool.size";
static final String READ_RETRY_POLICY = "io.read.retry.policy";
static final String WRITE_RETRY_POLICY = "io.write.retry.policy";

private final boolean readOnly;
private final List<RemoteReadableTable<?, ?>> tables = new ArrayList<>();
Expand All @@ -58,6 +65,7 @@ public class RemoteTableProvider extends BaseTableProvider {
*/
private static Map<String, ExecutorService> tableExecutors = new ConcurrentHashMap<>();
private static Map<String, ExecutorService> callbackExecutors = new ConcurrentHashMap<>();
private static ScheduledExecutorService retryExecutor;

public RemoteTableProvider(TableSpec tableSpec) {
super(tableSpec);
Expand All @@ -72,7 +80,7 @@ public Table getTable() {
RemoteReadableTable table;
String tableId = tableSpec.getId();

TableReadFunction<?, ?> readFn = getReadFn();
TableReadFunction readFn = getReadFn();
RateLimiter rateLimiter = deserializeObject(RATE_LIMITER);
if (rateLimiter != null) {
rateLimiter.init(containerContext.config, taskContext);
Expand All @@ -83,11 +91,33 @@ public Table getTable() {
TableRateLimiter.CreditFunction<?, ?> writeCreditFn;
TableRateLimiter writeRateLimiter = null;

TableRetryPolicy readRetryPolicy = deserializeObject(READ_RETRY_POLICY);
TableRetryPolicy writeRetryPolicy = null;

if ((readRetryPolicy != null || writeRetryPolicy != null) && retryExecutor == null) {
retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
Thread thread = new Thread(runnable);
thread.setName("table-retry-executor");
thread.setDaemon(true);
return thread;
});
}

if (readRetryPolicy != null) {
readFn = new RetriableReadFunction<>(readRetryPolicy, readFn, retryExecutor);
}

TableWriteFunction writeFn = getWriteFn();

boolean isRateLimited = readRateLimiter.isRateLimited();
if (!readOnly) {
writeCreditFn = deserializeObject(WRITE_CREDIT_FN);
writeRateLimiter = new TableRateLimiter(tableSpec.getId(), rateLimiter, writeCreditFn, RL_WRITE_TAG);
isRateLimited |= writeRateLimiter.isRateLimited();
writeRetryPolicy = deserializeObject(WRITE_RETRY_POLICY);
if (writeRetryPolicy != null) {
writeFn = new RetriableWriteFunction(writeRetryPolicy, writeFn, retryExecutor);
}
}

// Optional executor for future callback/completion. Shared by both read and write operations.
Expand Down Expand Up @@ -116,10 +146,18 @@ public Table getTable() {
table = new RemoteReadableTable(tableSpec.getId(), readFn, readRateLimiter,
tableExecutors.get(tableId), callbackExecutors.get(tableId));
} else {
table = new RemoteReadWriteTable(tableSpec.getId(), readFn, getWriteFn(), readRateLimiter,
table = new RemoteReadWriteTable(tableSpec.getId(), readFn, writeFn, readRateLimiter,
writeRateLimiter, tableExecutors.get(tableId), callbackExecutors.get(tableId));
}

TableMetricsUtil metricsUtil = new TableMetricsUtil(containerContext, taskContext, table, tableId);
if (readRetryPolicy != null) {
((RetriableReadFunction) readFn).setMetrics(metricsUtil);
}
if (writeRetryPolicy != null) {
((RetriableWriteFunction) writeFn).setMetrics(metricsUtil);
}

table.init(containerContext, taskContext);
tables.add(table);
return table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,5 +100,12 @@ default CompletableFuture<Map<K, V>> getAllAsync(Collection<K> keys) {
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().join())));
}

/**
* Determine whether the current operation can be retried with the last thrown exception.
* @param exception exception thrown by a table operation
* @return whether the operation can be retried
*/
boolean isRetriable(Throwable exception);

// optionally implement readObject() to initialize transient states
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,13 @@ default CompletableFuture<Void> deleteAllAsync(Collection<K> keys) {
return CompletableFuture.allOf(Iterables.toArray(deleteFutures, CompletableFuture.class));
}

/**
* Determine whether the current operation can be retried with the last thrown exception.
* @param exception exception thrown by a table operation
* @return whether the operation can be retried
*/
boolean isRetriable(Throwable exception);

/**
* Flush the remote store (optional)
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.table.retry;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.samza.SamzaException;

import net.jodah.failsafe.AsyncFailsafe;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;


/**
* Helper class adapting the generic {@link TableRetryPolicy} to a failsafe {@link RetryPolicy} and
* creating failsafe retryer instances with proper metrics management.
*/
class FailsafeAdapter {
/**
* Convert the {@link TableRetryPolicy} to failsafe {@link RetryPolicy}.
* @return this policy instance
*/
static RetryPolicy valueOf(TableRetryPolicy policy) {
RetryPolicy failSafePolicy = new RetryPolicy();

switch (policy.getBackoffType()) {
case NONE:
break;

case FIXED:
failSafePolicy.withDelay(policy.getSleepTime().toMillis(), TimeUnit.MILLISECONDS);
break;

case RANDOM:
failSafePolicy.withDelay(policy.getRandomMin().toMillis(), policy.getRandomMax().toMillis(), TimeUnit.MILLISECONDS);
break;

case EXPONENTIAL:
failSafePolicy.withBackoff(policy.getSleepTime().toMillis(), policy.getExponentialMaxSleep().toMillis(), TimeUnit.MILLISECONDS,
policy.getExponentialFactor());
break;

default:
throw new SamzaException("Unknown retry policy type.");
}

if (policy.getMaxDuration() != null) {
failSafePolicy.withMaxDuration(policy.getMaxDuration().toMillis(), TimeUnit.MILLISECONDS);
}
if (policy.getMaxAttempts() != null) {
failSafePolicy.withMaxRetries(policy.getMaxAttempts());
}
if (policy.getJitter() != null && policy.getBackoffType() != TableRetryPolicy.BackoffType.RANDOM) {
failSafePolicy.withJitter(policy.getJitter().toMillis(), TimeUnit.MILLISECONDS);
}

failSafePolicy.retryOn(e -> policy.getRetryPredicate().test(e));

return failSafePolicy;
}

/**
* Obtain an async failsafe retryer instance with the specified policy, metrics, and executor service.
* @param retryPolicy retry policy
* @param metrics retry metrics
* @param retryExec executor service for scheduling async retries
* @return {@link net.jodah.failsafe.AsyncFailsafe} instance
*/
static AsyncFailsafe<?> failsafe(RetryPolicy retryPolicy, RetryMetrics metrics, ScheduledExecutorService retryExec) {
long startMs = System.currentTimeMillis();
return Failsafe.with(retryPolicy).with(retryExec)
.onRetry(e -> metrics.retryCount.inc())
.onRetriesExceeded(e -> {
metrics.retryTimer.update(System.currentTimeMillis() - startMs);
metrics.permFailureCount.inc();
})
.onSuccess((e, ctx) -> {
if (ctx.getExecutions() > 1) {
metrics.retryTimer.update(System.currentTimeMillis() - startMs);
} else {
metrics.successCount.inc();
}
});
}
}
Loading