Skip to content

Conversation

@PawasChhokra
Copy link
Contributor

No description provided.

/**
* Check if the stream described by the spec already exists.
* @param streamSpec The spec, or blueprint for the physical stream on the system.
* @return true if stream exists already, false otherwise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. the stream

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we won't be needing this function at all. Hence removed.


Latch getLatch(int size, String latchId);

Lock getLock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather named it CoordinationLock (or something), to reduce confusion between this and java concurrency lock.

Copy link
Contributor Author

@PawasChhokra PawasChhokra Aug 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed it to DistributedLock :)


boolean hasLock();

void setLockListener(LockListener listener);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think that lock should be synchronous utility. It makes programming much easier.
lock = getLock();

lock.lock()
.....
lock.release/unlock()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the listener interface.


/**
* Check if the streams described by the specs already exist.
* @param streams A list of stream specs, whose existence we need to check for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. s/A/a/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we won't be needing this function at all. Hence removed.

* @throws Exception exception for latch timeout
*/
private void createStreamsWithLock(List<StreamSpec> intStreams) throws Exception {
if (!intStreams.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. I prefer if(intStreams.isEmpty()) return; to avoid unnecessary indentation. But that's personal preference.

if (coordinationUtils != null) {
Lock initLock = coordinationUtils.getLock();
LockListener lockListener = null;
if (coordinationUtils.getClass().getName().equals("org.apache.samza.zk.ZkCoordinationUtils")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is a shortcut, but I think it'd be better to put this logic into the coordinationUtils, not here. LocalApplicationRunner should not know about Zk stuff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.


package org.apache.samza.zk;

public class TestZkLock {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not very efficient test :)


//Start timer for timeout
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("ZkLock-%d").build());
scheduler.schedule(() -> {
Copy link
Contributor

@shanthoosh shanthoosh Aug 11, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we'd discussed offline, this exception will not get propagated to the other main
thread which is running the while loop(Since the exception thrown by the
threadPool's thread will be swallowed by threadPool).

So essentially all the followers will go into infinite loop (as long as there's a leader).

Please share a separate state (something like CountdownLatch, AtomicBoolean,
volatile boolean) between the two threads to indicate that the periodic
polling should stop.

Please correct me if i'm wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following this discussion, I have changed the lock timeout implementation. Kindly take a look at it and let me know if you have any comments. Thank you!

boolean hasLock();

void setLockListener(LockListener listener);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

*/

package org.apache.samza.coordinator;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add java docs? Also, add some explanation around the expectations from the implementers of the interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added :)

*/
package org.apache.samza.coordinator;

public interface LockListener {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add class level and method level java docs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the class.

* @param streamSpec The spec, or blueprint for the physical stream on the system.
* @return true if stream exists already, false otherwise
*/
default boolean existStream(StreamSpec streamSpec) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just throwing out a comment to see everyone's take. I usually prefer boolean method to follow some question like naming scheme to make it more readable. Since this is user facing, it is all the more important to make it as readable and explicit as possible.
For instance, I like checkIfStreamExists better than existStream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will keep in mind for the future, thank you!

initLock.unlock();
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is unclear as to what onError is for. If its part of LockListener class, I would assume this method gets invoked during failure cases related to lock acquisition.
However, the code here suggests a different usage of onError.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

createIntermediateStreams(intStreams);
LOG.info("Created intermediate streams successfully!");
} catch (Exception e) {
onError();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why invoke onError here? Refer to the comment below about clarity of the onError method.

public void setLockListener(LockListener listener) {
this.zkLockListener = listener;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

public class ZkLock implements Lock {

public static final Logger LOG = LoggerFactory.getLogger(ZkLock.class);
private final static String LOCK_PATH = "lock";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider renaming this or the lockPath instance variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to DistributedLock.

if (!hasLock.get()) {
throw new SamzaException("Timed out while acquiring lock for creating intermediate streams.");
}
}, zkConfig.getZkSessionTimeoutMs(), TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need zkConfig? How about take the timeout in lock method or maybe use a hardcoded default or both :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defined a lock timeout value in LocalApplicationRunner instead of using ZkConfig.



private void createIntermediateStreams(List<StreamSpec> intStreams) throws Exception {
boolean streamsExist = getStreamManager().checkIfStreamsExist(intStreams);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we get the subset of intStreams that are not created and only try creating with them? In case of stream already exists, we do the work twice; once for checking if it exists and other time during the actual creation which would again result in no op.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline.

* @param streamSpec The spec, or blueprint for the physical stream on the system.
* @return true if stream exists already, false otherwise
*/
default boolean existStream(StreamSpec streamSpec) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't have context on this PR, just wondering why we need a new method in the SystemAdmin API for this? Is this just for convenience or do we not currently have a way to check if a stream exists?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would n't the getStreamMetadata API on SystemAdmin be sufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we won't be needing this function at all. Hence removed.

@vjagadish1989
Copy link
Contributor

vjagadish1989 commented Aug 11, 2017

@navina :
Is this for merge into master? Or is the plan to merge this to a separate branch, clean-up and later merge back?

@navina
Copy link
Contributor

navina commented Aug 11, 2017

@vjagadish1989 This is for 0.14.0 branch.

Copy link
Contributor

@navina navina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested couple of changes. Otherwise, looks good.

Since we are running short of time, I am ok with skipping testing for this PR. I am sure we will add it prior to the 0.14.0 release for which this is targeted.

+1 thanks!

throw new UnsupportedOperationException();
}
}
} No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we try to get rid of these minor changes that are seemingly unrelated to the patch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

getStreamManager().createStreams(intStreams);
} else {
LOG.error("Timed out while trying to acquire lock");
coordinationUtils.reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you wrap lines from Line 253 in a try-finally block? You should invoke reset even in the case where lock has been successfully acquired and critical section is complete. Adding reset in finally block will guarantee this behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

public class ZkDistributedLock implements DistributedLock {

public static final Logger LOG = LoggerFactory.getLogger(ZkDistributedLock.class);
private final static String LOCK_PATH = "distributed_lock";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this configurable by making it a constructor argument?
Ideally, this should be unique for different deployments of the streamapplication. Can we make this based on the hashcode of intermediate streams list for now? Check out how @bharathkk is doing it in PR #265

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made it configurable. I have reused Bharath's code for this. Kindly let me know if it's okay.

return true;
} else {
// Keep trying to acquire the lock
long currentTime = System.currentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can get rid of the local variable currentTime

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

} else {
// Keep trying to acquire the lock
long currentTime = System.currentTimeMillis();
if ((currentTime - startTime) >= lockTimeout) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps you can make this the condition for your while loop?
See if you can accommodate this change. If it doesn't work out, it's fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done :)

nodePath = null;
LOG.info("Ephemeral lock node deleted. Unlocked!");
} else {
LOG.error("Ephemeral lock node you want to delete doesn't exist");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps change this warn since it doesn't actually indicate an error condition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* Releases the lock
*/
void unlock();
} No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be helpful to have a close() method in the lock to be able to close the connection (separately from the unlock)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sborya I think that we should wait a bit to see some more distributed lock implementation. If all distributed lock implementation would require init() to open a distributed resource and close() to close a distributed resource, we should add both APIs to this interface. If not, then init() and close() would be per implementation.

Copy link
Contributor

@shanthoosh shanthoosh Aug 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nickpan47

When there’s a failure in unlock, a client of DistributedLock will not be able to distinguish if it’s because of releasing resources or due to unlock implementation. A lock owner wanting to release a lock to other processor in group, will not be able to conclude effectively if unlock has happened or not in case of exceptions(Due to two possibilities and will not be able to handle exception effectively).

Also, logically with just lock, unlock operations a client can call lock() after unlock(), essentially trying to join again which wouldn’t be possible if resource establishment happens in constructor/factory.create method(instead of separate init() lifecycle method).

Since any DistributedLock implementation will require resource establishment with a RemoteService(Zookeeper, AzureBlobStore, any), extending Autocloseable and having init() lifecycle methods, we would be clearly defining boundaries and communicating that intent with our interface(Similar to SystemProducer, SystemConsumer).

I’m not suggesting that we should do it, just interested to know your opinion.

Can you please point out why this reasoning is incorrect(not required now).

Please share your thoughts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shanthoosh please see my comments on the lock interface definition. You point here is exactly what I tried to say in the interface definition. You can have the lock itself maintain a heart-beat/life-cycle/lease to the remote resource, which may not require init()/close() in that case. However, it seems necessary to define a callback to the lock interface s.t. user can react to loss of possession of remote resource.

Copy link
Contributor

@nickpan47 nickpan47 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PawasChhokra thanks for the patch! I mainly have two comments: 1) try not to tie the implementation of LocalApplicationRunner to any ZK-specific implementations; 2) need to consider the interface of DistributedLock to handle failure cases between the lock() and unlock().

Thanks!


Latch getLatch(int size, String latchId);

DistributedLock getLock(String initLockPath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general comment: in a broader context, lockName sounds better than lockPath. lockPath is more ZK-specific.

import java.util.concurrent.TimeUnit;


public interface DistributedLock {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to add javadoc for public interface classes.


/**
* Tries to acquire the lock
* @param timeout Duration of lock acquiring timeout.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: timeout to acquire a lock? Or duration of lock after acquired? I think that you were referring to the first. It would be better to clearly state it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is the timeout to acquire the lock. Will make it more clear in the documentation.

* Releases the lock
*/
void unlock();
} No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sborya I think that we should wait a bit to see some more distributed lock implementation. If all distributed lock implementation would require init() to open a distributed resource and close() to close a distributed resource, we should add both APIs to this interface. If not, then init() and close() would be per implementation.

private static final long LEADER_ELECTION_WAIT_TIME_MS = 1000;
// Lock timeout is set to 10 seconds here, as we don't want to introduce a new config value currently.
private static final long LOCK_TIMEOUT_SECONDS = 10;
private static final String ZK_COORDINATION_CLASS = "org.apache.samza.zk.ZkJobCoordinatorFactory";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should avoid more and more ZK-specific names/details to bleed into the general implementation. LocalApplicationRunner is not just for ZK-based implementation. Hence, I would recommend not to hard-code any actual ZK implementation classes here in the LocalApplicationRunner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nickpan47 I agree. but we have to do this string matching because we decided that the user will not configure a CoordinationServiceFactory and that we will derive it from job.coordinator.factory configuration. Are you recommending that we introduce another configuration ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@navina I am OK if the config has a default, which is ZK-specific. I am against the hard-code ZK-specific class names in the LocalApplicationRunner implementation. I would suggest that in the LocalApplicationRunner, we load the CoordinatorServiceFactory via Utils.getInstance(JobConfig.getJobCoordinatorFactory()), w/o hard-code the ZK job coordinator class names in the LocalApplicationRunner.

if (ZK_COORDINATION_CLASS.equals(jobCoordinatorFactoryClassName)) {
ApplicationConfig appConfig = new ApplicationConfig(config);
return new ZkCoordinationServiceFactory().getCoordinationService(
return Util.<CoordinationServiceFactory>getObj(ZK_COORDINATION_SERVICE_CLASS).getCoordinationService(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove the ZK-specific code here.

* @throws TimeoutException exception for lock timeout
*/
/* package private */ void createStreams(String planId, List<StreamSpec> intStreams) throws TimeoutException {
/* package private */ void createStreams(String planId, List<StreamSpec> intStreams) throws TimeoutException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary extra leading white space.

int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath));

if (children.size() == 0 || index == -1) {
throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any error handling logic here (e.g. try to re-connect within the session expiration, etc.). Is it on the contract to the caller of DistributedLock.lock() to handle those exceptions? If yes, it would be necessary to define the exception types the DistributedLock.lock() method would throw and clearly state in the document what the caller supposed to do when catch those exceptions. Otherwise, the implementer of lock() should just handle the specific exceptions internally (i.e. re-try) until timeout.

return true;
} else {
try {
Thread.sleep(random.nextInt(1000));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that in ZK implementation, we can use watch() on children?

@Override
public void unlock() {
if (nodePath != null) {
zkUtils.getZkClient().delete(nodePath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I would recommend to think it through about the zkClient lifecycle needed in this lock implementation. Are we assuming that the zkClient is initialized by some other class and just passed in to this lock implementation? Or are we assuming that the lock would internally own the lifecycle of the zkClient? If we are assuming the first case, I think that this DistributedLock implementation lack of handling on the failure cases: what if the zkClient connection to the ZK server was dropped after the user has called lock() and proceeded into the critical section? If the disconnect time is longer than the session timeout, there would be race conditions. How do the users of the DistributedLock being notified of this issue after calling the lock()?

@prateekm
Copy link
Contributor

@PawasChhokra Does this still need to be merged? If not, can we close this PR?

@PawasChhokra
Copy link
Contributor Author

@prateekm This has been implemented already. Closing the pull request now. Thanks for the reminder!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants