Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@
import com.google.inject.Inject;
import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.curator.CuratorUtils;
import org.apache.druid.curator.announcement.Announcer;
import org.apache.druid.curator.announcement.NodeAnnouncer;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
Expand All @@ -54,7 +53,7 @@ public class WorkerCuratorCoordinator
private final ObjectMapper jsonMapper;
private final RemoteTaskRunnerConfig config;
private final CuratorFramework curatorFramework;
private final Announcer announcer;
private final NodeAnnouncer announcer;

private final String baseAnnouncementsPath;
private final String baseTaskPath;
Expand All @@ -77,7 +76,7 @@ public WorkerCuratorCoordinator(
this.curatorFramework = curatorFramework;
this.worker = worker;

this.announcer = new Announcer(curatorFramework, Execs.directExecutor());
this.announcer = new NodeAnnouncer(curatorFramework);

this.baseAnnouncementsPath = getPath(Arrays.asList(indexerZkConfig.getAnnouncementsPath(), worker.getHost()));
this.baseTaskPath = getPath(Arrays.asList(indexerZkConfig.getTasksPath(), worker.getHost()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,349 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.curator.announcement;

import com.google.common.annotations.VisibleForTesting;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.ZKPathsUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* NodeAnnouncer announces single node on Zookeeper and only watches this node,
* while {@link Announcer} watches all child paths, not only this node
*/
public class NodeAnnouncer
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NodeAnnouncer and Announcer share a lot of code. Is it possible to extract a common base class and either extend it as Announcer and NodeAnnouncer or just add a flag to that basic class's state and use something like BaseAnnouncer(watchAll=true) as Announcer and BaseAnnouncer(watchAll=true) as NodeAnnouncer?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add concurrent control flow documentation to this class? From what threads (thread pools, executors) each method may be called?

{
private static final Logger log = new Logger(NodeAnnouncer.class);

private final CuratorFramework curator;

// incase a path is added to `toAnnounce` in announce() before zk is connected,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Javadoc /** ... */ comments for fields.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"In case", missing space.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right that it's implied in this comment that announce() may be called concurrently by somebody? How is this possible before start()? Could you please add concurrent control flow documentation to announce() and start() methods?

// should remember the path and do announce in start() later
private final List<Announceable> toAnnounce = new ArrayList<>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's clearer to call it toAnnounceInStart?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please annotate @GuadedBy("toAnnounce"). Could use create a separate lock object in a field called "lock"?

// incase a path is added to `toUpdate` in update() before zk is connected,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"In case"

// should remember the path and do update in start() later
private final List<Announceable> toUpdate = new ArrayList<>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this different from toAnnounce?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toUpdate is added when update() called before zk is connected, the path to update may or may not be added to toAnnounce before.
Most part of NodeAnnouncer's code logic is following the original Announcer.java

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please annotate @GuadedBy("toAnnounce").

private final ConcurrentMap<String, NodeCache> listeners = new ConcurrentHashMap<>();
private final ConcurrentMap<String, byte[]> announcedPaths = new ConcurrentHashMap<>();
// only who creates the parent path can drop the parent path, so should remmeber the created parents
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"only who creates the parent path can drop the parent path" - "Who" here is a thread? A class? Is this rule imposed by the Curator framework?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"remember"

private final List<String> parentsIBuilt = new CopyOnWriteArrayList<String>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name and the purpose of this field is not clear

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it is clearer to call it pathsCreatedInThisAnnouncer? Is there anything specific about them being "parents"? They are parents of what?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This list is iterated only once. There is no win in using CopyOnWriteArrayList rather than a simple ArrayList protected by a lock.


private boolean started = false;

public NodeAnnouncer(CuratorFramework curator)
{
this.curator = curator;
}

@VisibleForTesting
Set<String> getAddedPaths()
{
return announcedPaths.keySet();
}

@LifecycleStart
public void start()
{
log.info("Starting announcer");
synchronized (toAnnounce) {
if (started) {
return;
}

started = true;

for (Announceable announceable : toAnnounce) {
announce(announceable.path, announceable.bytes, announceable.removeParentsIfCreated);
}
toAnnounce.clear();

for (Announceable announceable : toUpdate) {
update(announceable.path, announceable.bytes);
}
toUpdate.clear();
}
}

@LifecycleStop
public void stop()
{
log.info("Stopping announcer");
synchronized (toAnnounce) {
if (!started) {
return;
}

started = false;

Closer closer = Closer.create();
for (NodeCache cache : listeners.values()) {
closer.register(cache);
}
CloseQuietly.close(closer);

for (String announcementPath : announcedPaths.keySet()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entries are removed via map.remove() while iterating over the map. I cannot find proof that this is safe; it would be better to remove them via iterator.remove().

unannounce(announcementPath);
}

if (!parentsIBuilt.isEmpty()) {
CuratorTransaction transaction = curator.inTransaction();
for (String parent : parentsIBuilt) {
try {
transaction = transaction.delete().forPath(parent).and();
}
catch (Exception e) {
log.error(e, "Unable to delete parent[%s].", parent);
}
}
try {
((CuratorTransactionFinal) transaction).commit();
}
catch (Exception e) {
log.error(e, "Unable to commit transaction.");
}
}
}
}

/**
* Like announce(path, bytes, true).
*/
public void announce(String path, byte[] bytes)
{
announce(path, bytes, true);
}

/**
* Announces the provided bytes at the given path. Announcement means that it will create an ephemeral node
* and monitor it to make sure that it always exists until it is unannounced or this object is closed.
*
* @param path The path to announce at
* @param bytes The payload to announce
* @param removeParentIfCreated remove parent of "path" if we had created that parent
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The explanation of this parameter is not clear.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part of code is following the original Announcer's code.
this means only who created the parent path should do the remove. According to the git log, this code is added to fix a bug.

*/
public void announce(String path, byte[] bytes, boolean removeParentIfCreated)
{
synchronized (toAnnounce) {
if (!started) {
toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated));
return;
}
}

final String parentPath = ZKPathsUtils.getParentPath(path);
boolean buildParentPath = false;

byte[] value = announcedPaths.get(path);

if (value == null) {
try {
if (curator.checkExists().forPath(parentPath) == null) {
buildParentPath = true;
}
}
catch (Exception e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to catch a narrower exception type?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the curator.checkExists().forPath(parentPath) throws Exception in method signature, so we have to catch Exception

log.debug(e, "Problem checking if the parent existed, ignoring.");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ignoring?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part of code is following the original Announcer's code.

when this exception happens: 1, if the parent path exists, this exception can be ignored. 2 if the parent path doesn't exist, the following code will throw exception due to failed to add path

}

// Synchronize to make sure that I only create a listener once.
synchronized (toAnnounce) {
if (!listeners.containsKey(path)) {
final NodeCache cache = new NodeCache(curator, path, true);
cache.getListenable().addListener(
new NodeCacheListener()
{
@Override
public void nodeChanged() throws Exception
{
ChildData currentData = cache.getCurrentData();
if (currentData == null) {
final byte[] value = announcedPaths.get(path);
if (value != null) {
log.info("Node[%s] dropped, reinstating.", path);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it means that the node is dropped?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like someone dropped it from zookeeper cli

createAnnouncement(path, value);
}
}
}
}
);

if (started) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that started flag is already checked at the beginning of this method, why it is checked again here?

if (buildParentPath) {
createPath(parentPath, removeParentIfCreated);
}
startCache(cache);
listeners.put(path, cache);
}
}
}
}

boolean created = false;
synchronized (toAnnounce) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Several distinct synchronized blocks on the same variable in the same method doesn't seem to make a lot of sense to me

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kaijianding could you please explain?

if (started) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that started flag is already checked at the beginning of this method, why it is checked again here?

byte[] oldBytes = announcedPaths.putIfAbsent(path, bytes);

if (oldBytes == null) {
created = true;
} else if (!Arrays.equals(oldBytes, bytes)) {
throw new IAE("Cannot reannounce different values under the same path");
}
}
}

if (created) {
try {
createAnnouncement(path, bytes);
}
catch (Exception e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to narrow the exception type?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the curator.checkExists().forPath(parentPath) throws Exception in method signature, so we have to catch Exception

throw new RuntimeException(e);
}
}
}

public void update(final String path, final byte[] bytes)
{
synchronized (toAnnounce) {
if (!started) {
// removeParentsIfCreated is not relevant for updates; use dummy value "false".
toUpdate.add(new Announceable(path, bytes, false));
return;
}
}

byte[] oldBytes = announcedPaths.get(path);

if (oldBytes == null) {
throw new ISE("Cannot update a path[%s] that hasn't been announced!", path);
}

synchronized (toAnnounce) {
try {
if (!Arrays.equals(oldBytes, bytes)) {
announcedPaths.put(path, bytes);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a race condition is possible between announcedPaths.get() and announcedPaths.put(), is that benign? If not, please rewrite the code using map.compute().

updateAnnouncement(path, bytes);
}
}
catch (Exception e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to narrow the exception type?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the curator.checkExists().forPath(parentPath) throws Exception in method signature, so we have to catch Exception

throw new RuntimeException(e);
}
}
}

private String createAnnouncement(final String path, byte[] value) throws Exception
{
return curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path, value);
}

private Stat updateAnnouncement(final String path, final byte[] value) throws Exception
{
return curator.setData().compressed().inBackground().forPath(path, value);
}

/**
* Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer
* will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions.
* <p/>
* If you need to completely clear all the state of what is being watched and announced, stop() the Announcer.
*
* @param path the path to unannounce
*/
public void unannounce(String path)
{
log.info("unannouncing [%s]", path);
final byte[] value = announcedPaths.remove(path);

if (value == null) {
log.error("Path[%s] not announced, cannot unannounce.", path);
return;
}

try {
curator.inTransaction().delete().forPath(path).and().commit();
}
catch (KeeperException.NoNodeException e) {
log.info("node[%s] didn't exist anyway...", path);
}
catch (Exception e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the curator.xxx.forPath(parentPath) throws Exception in method signature, so we have to catch Exception

throw new RuntimeException(e);
}
}

private void startCache(NodeCache cache)
{
try {
cache.start();
}
catch (Exception e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the NodeCache.start() throws Exception in method signature, so we have to catch Exception

CloseQuietly.close(cache);
throw new RuntimeException(e);
}
}

private void createPath(String parentPath, boolean removeParentsIfCreated)
{
try {
curator.create().creatingParentsIfNeeded().forPath(parentPath);
if (removeParentsIfCreated) {
parentsIBuilt.add(parentPath);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The path is added to the remove list regardless of whether it was actually created two lines above or already existed?

}
log.debug("Created parentPath[%s], %s remove on stop() called.", parentPath, removeParentsIfCreated ? "will" : "will not");
}
catch (Exception e) {
log.error(e, "Problem creating parentPath[%s], someone else created it first?", parentPath);
}
}

private static class Announceable
{
final String path;
final byte[] bytes;
final boolean removeParentsIfCreated;

public Announceable(String path, byte[] bytes, boolean removeParentsIfCreated)
{
this.path = path;
this.bytes = bytes;
this.removeParentsIfCreated = removeParentsIfCreated;
}
}
}
Loading