Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
*/
package org.apache.samza.coordinator;

import org.apache.samza.SamzaException;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.zk.ZkCoordinationUtils;
import org.apache.samza.zk.ZkJobCoordinatorFactory;


/**
*
Expand All @@ -31,14 +37,17 @@
@InterfaceStability.Evolving
public interface CoordinationUtils {

/**
* reset the internal structure. Does not happen automatically with stop()
*/
void reset();
static CoordinationUtils getCoordinationUtils(String groupId, String participantId, Config config) {

// currently we figure out if it is ZK based utilities by checking JobCoordinatorConfig.JOB_COORDINATOR_FACTORY
String jobCoordinatorFactoryClassName = config.get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "");

if (!ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName))
throw new SamzaException(String.format("Samza supports only ZK based coordination utilities with %s == %s",
JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, ZkJobCoordinatorFactory.class.getName()));

// facilities for group coordination
LeaderElector getLeaderElector(); // leaderElector is unique based on the groupId
return new ZkCoordinationUtils(groupId, participantId, config);
}

Latch getLatch(int size, String latchId);
DistributedLock getLock(String lockId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.coordinator;

import org.apache.samza.config.Config;
import java.util.concurrent.TimeUnit;


/**
* factory to instantiate a c{@link CoordinationUtils} service
*/
public interface CoordinationServiceFactory {
public interface DistributedLock {

/**
* Tries to acquire the lock
* @param timeout Duration of lock acquiring timeout.
* @param unit Time Unit of the timeout defined above.
* @return true if lock is acquired successfully, false if it times out.
*/
boolean lock(long timeout, TimeUnit unit);

/**
* Releases the lock
*/
void unlock();

/**
* get a unique service instance
* @param groupId - unique id to identify the service
* @param participantId - a unique id that identifies the participant in the service
* @param updatedConfig - configs, to define the details of the service
* @return a unique service instance
* perform any required action for closing
*/
CoordinationUtils getCoordinationService(String groupId, String participantId, Config updatedConfig);
}
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,24 @@

import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.JobCoordinatorConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.Latch;
import org.apache.samza.coordinator.LeaderElector;
import org.apache.samza.coordinator.DistributedLock;
import org.apache.samza.execution.ExecutionPlan;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.processor.StreamProcessor;
Expand All @@ -46,8 +47,6 @@
import org.apache.samza.task.AsyncStreamTaskFactory;
import org.apache.samza.task.StreamTaskFactory;
import org.apache.samza.task.TaskFactoryUtil;
import org.apache.samza.zk.ZkCoordinationServiceFactory;
import org.apache.samza.zk.ZkJobCoordinatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -57,13 +56,11 @@
public class LocalApplicationRunner extends AbstractApplicationRunner {

private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class);
// Latch id that's used for awaiting the init of application before creating the StreamProcessors
private static final String INIT_LATCH_ID = "init";
// Latch timeout is set to 10 min
private static final long LATCH_TIMEOUT_MINUTES = 10;
// Lock timeout is set to 10 seconds here, as we don't want to introduce a new config value currently.
private static final long LOCK_TIMEOUT_SECONDS = 10;

private final String uid;
private final CoordinationUtils coordinationUtils;

private final Set<StreamProcessor> processors = ConcurrentHashMap.newKeySet();
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
private final AtomicInteger numProcessorsToStart = new AtomicInteger();
Expand Down Expand Up @@ -122,17 +119,13 @@ private void shutdownAndNotify() {
}
}

if (coordinationUtils != null) {
coordinationUtils.reset();
}
shutdownLatch.countDown();
}
}

public LocalApplicationRunner(Config config) {
super(config);
uid = UUID.randomUUID().toString();
coordinationUtils = createCoordinationUtils();
}

@Override
Expand Down Expand Up @@ -207,45 +200,54 @@ public void waitForFinish() {
}

/**
* Create the {@link CoordinationUtils} needed by the application runner, or null if it's not configured.
* @return an instance of {@link CoordinationUtils}
* Generates a unique lock ID which is consistent for all processors within the same application lifecycle.
* Each {@code StreamSpec} has an ID that is unique and is used for hashcode computation.
* @param intStreams list of {@link StreamSpec}s
* @return lock ID
*/
/* package private */ CoordinationUtils createCoordinationUtils() {
String jobCoordinatorFactoryClassName = config.get(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, "");

// TODO: we will need a better way to package the configs with application runner
if (ZkJobCoordinatorFactory.class.getName().equals(jobCoordinatorFactoryClassName)) {
ApplicationConfig appConfig = new ApplicationConfig(config);
return new ZkCoordinationServiceFactory().getCoordinationService(appConfig.getGlobalAppId(), uid, config);
} else {
return null;
}
private String generateLockId(List<StreamSpec> intStreams) {
return String.valueOf(Objects.hashCode(intStreams.stream()
.map(StreamSpec::getId)
.collect(Collectors.toList())));
}

/**
* Create intermediate streams using {@link org.apache.samza.execution.StreamManager}.
* If {@link CoordinationUtils} is provided, this function will first invoke leader election, and the leader
* will create the streams. All the runner processes will wait on the latch that is released after the leader finishes
* stream creation.
* If {@link CoordinationUtils} is provided, this function will first acquire a lock, and then create the streams.
* All the runner processes will either wait till the time they acquire the lock, or timeout after the specified time.
* After stream creation, they will unlock and proceed normally.
* @param intStreams list of intermediate {@link StreamSpec}s
* @throws Exception exception for latch timeout
*/
/* package private */ void createStreams(List<StreamSpec> intStreams) throws Exception {
if (!intStreams.isEmpty()) {
if (coordinationUtils != null) {
Latch initLatch = coordinationUtils.getLatch(1, INIT_LATCH_ID);
LeaderElector leaderElector = coordinationUtils.getLeaderElector();
leaderElector.setLeaderElectorListener(() -> {
getStreamManager().createStreams(intStreams);
initLatch.countDown();
});
leaderElector.tryBecomeLeader();
initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES);
} else {
// each application process will try creating the streams, which
// requires stream creation to be idempotent
getStreamManager().createStreams(intStreams);
private void createStreams(List<StreamSpec> intStreams) {
if (intStreams.isEmpty()) {
return;
}

DistributedLock initLock = null;
try {
CoordinationUtils coordinationUtils = CoordinationUtils.getCoordinationUtils(
new ApplicationConfig(config).getGlobalAppId(), uid, config);
initLock = coordinationUtils.getLock(generateLockId(intStreams));
} catch (SamzaException e) {
LOG.warn("Coordination utils are not available.", e);
}

if (initLock != null) {
try {
boolean hasLock = initLock.lock(LOCK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (hasLock) {
getStreamManager().createStreams(intStreams);
initLock.unlock();
} else {
LOG.error("Timed out while trying to acquire lock.", new TimeoutException());
}
} finally {
initLock.close();
}
} else {
// each application process will try creating the streams, which
// requires stream creation to be idempotent
getStreamManager().createStreams(intStreams);
}
}

Expand Down

This file was deleted.

Loading