Skip to content

Synchronization of lookups during startup of druid processes#4758

Merged
himanshug merged 35 commits intoapache:masterfrom
a2l007:synclookups
Oct 13, 2017
Merged

Synchronization of lookups during startup of druid processes#4758
himanshug merged 35 commits intoapache:masterfrom
a2l007:synclookups

Conversation

@a2l007
Copy link
Copy Markdown
Contributor

@a2l007 a2l007 commented Sep 6, 2017

This introduces lookup synchronization during startup of Druid processes. The main motivation for this is to cut the lookup load time when performed by the Coordinator, so that realtime tasks have lookups available immediately.
Upon startup, processes communicate with the coordinator on retrieving the lookups to be loaded and loads them before finishing up the start sequence. Thus all the processes have the lookups loaded before starting to serve queries. If it fails to communicate with the coordinator (Coordinator unavailable) then it falls back to snapshot directory if there is any.
This does not fail startup of the process in case of failing to load lookups. Following are the high level changes:

  1. Add synchronization of lookups in LookupReferencesManager
  2. Parallelize the lookup loading process in LookupReferencesManager
  3. Move LookupReferencesManager and associated classes to druid-server module. This was necessary as LookupReferencesManager required access to LookupListeningAnnouncerConfig for tier related information and this was not possible from the druid-processing module.
  4. Introduce a flag which can be used to disable the lookup synchronization workflow on startup.

private final String snapshotWorkingDir;

@JsonProperty
private int numLookupLoadingThreads = 10;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you update the documentation with this and also mention that this threadpool is only created at the time of start and destroyed , it is not kept during the lifetime of jvm.

private int numLookupLoadingThreads = 10;

@JsonProperty
private boolean disableLookupSync = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disableLookupSyncOnStartup ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

beyond the startup, they are always synced.

LockSupport.parkNanos(LookupReferencesManager.this, TimeUnit.MINUTES.toNanos(1));
}
catch (Throwable t) {
LOG.makeAlert(t, "Error occured while lookup notice handling.").emit();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this just indentation change? ... why did it change ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation changed when I changed the Runnable right above it.

String tier = lookupListeningAnnouncerConfig.getLookupTier();
List<LookupBean> lookupBeanList = new ArrayList<>();
// Check if the coordinator is accessible
if (getCoordinatorUrl().isEmpty()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you put this check inside getLookupListFromCoordinator(tier) instead ?

ListeningScheduledExecutorService executorService = MoreExecutors.listeningDecorator(
Executors.newScheduledThreadPool(
lookupConfig.getNumLookupLoadingThreads(),
Execs.makeThreadFactory("LookupReferencesManager--%s")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we name it different from the other long running .e.g LookupReferencesManager-startup-%s ?


LookupExtractorFactoryContainer container = lookupBean.getContainer();
if (container.getLookupExtractorFactory().start()) {
LOG.info(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add this log before calling start() ... "Starting lookup..." ? basically if some start() implementation is stuck for some reason then logs would tell which lookup is stuck on start().

stateRef.set(new LookupUpdateState(ImmutableMap.of(), ImmutableList.of(), ImmutableList.of()));
}
} else {
LOG.info("Lookup synchronization is disabled");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you change it to "Lookup loading on startup is disabled" .

response.getStatus(),
response.getContent()
);
if (lookupSnapshotTaker != null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this check does not belong here, this should just return null.

response.getContent()
);
if (lookupSnapshotTaker != null) {
LOG.info("Attempting to load saved snapshot");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also

} else {
// Older version of getSpecificTier returns a list of lookup names.
// Lookup loading is performed via snapshot if older version is present.
if (response.getContent().startsWith("[")) {
Copy link
Copy Markdown
Contributor

@himanshug himanshug Sep 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you update comment to say, this check should be removed in a future release and is there only for backward compatibility?

stateRef.set(new LookupUpdateState(ImmutableMap.of(), ImmutableList.of(), ImmutableList.of()));
private List<LookupBean> getLookupListFromSnapshot()
{
return lookupSnapshotTaker.pullExistingSnapshot();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should do the null check for lookupSnapshotTaker here

catch (Exception e) {
LOG.error("Error encountered while connecting to Coordinator");
}
return "";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return null ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and also this should probably be inside the catch block

final Server instance = selector.pick();
if (instance == null) {
LOG.info("Coordinator instance unavailable.");
return "";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return null ?

).get();
}

private LookupExtractorFactoryContainer getLookupEntryFromCode(String tier, String lookupCode)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not needed probably.

@JsonSubTypes.Type(name = "timeFormat", value = TimeFormatExtractionFn.class),
@JsonSubTypes.Type(name = "identity", value = IdentityExtractionFn.class),
@JsonSubTypes.Type(name = "lookup", value = LookupExtractionFn.class),
@JsonSubTypes.Type(name = "registeredLookup", value = RegisteredLookupExtractionFn.class),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this removed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JsonSubTypes.Type(name = "extraction", value = ExtractionDimensionSpec.class),
@JsonSubTypes.Type(name = "regexFiltered", value = RegexFilteredDimensionSpec.class),
@JsonSubTypes.Type(name = "listFiltered", value = ListFilteredDimensionSpec.class),
@JsonSubTypes.Type(name = "lookup", value = LookupDimensionSpec.class)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this removed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@a2l007
Copy link
Copy Markdown
Contributor Author

a2l007 commented Sep 7, 2017

@himanshug Suggested changes have been made. Please review.

@a2l007
Copy link
Copy Markdown
Contributor Author

a2l007 commented Sep 13, 2017

Requests to coordinator are now done via DruidLeaderClient.

Comment thread docs/content/querying/lookups.md Outdated
|Property|Description|Default|
|--------|-----------|-------|
|`druid.lookup.snapshotWorkingDir`| Working path used to store snapshot of current lookup configuration, leaving this property null will disable snapshot/bootstrap utility|null|
|`druid.lookup.numLookupLoadingThreads`| Number of threads for loading the lookups in parallel.This threadpool is created only at the time of start and destroyed once startup is done. It is not kept during the lifetime of the JVM.|10|
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "..lookups in parallel on startup. .."

private int numLookupLoadingThreads = 10;

@JsonProperty
private boolean disableLookupSyncOnStartup = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you document this as well ?


LookupExtractorFactoryContainer container = lookupBean.getContainer();
LOG.info(
"Started lookup [%s]:[%s]",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Started/Starting

container
);
if (container.getLookupExtractorFactory().start()) {
return new AbstractMap.SimpleImmutableEntry<>(lookupBean.getName(), container);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a log here with "Started lookup..."

}
stateRef.set(new LookupUpdateState(builder.build(), ImmutableList.of(), ImmutableList.of()));
}
catch (Exception e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should set thread to interrupted state if it was an interrupted exception

LOG.error(e, "Failed to finish lookup load process.");
}
executorService.shutdownNow();
} else {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also wait for executorService to finish using executorService.awaitTermination(1 minute) ... if it doesn't finish within that duration then log a warning.

{
ImmutableMap.Builder<String, LookupExtractorFactoryContainer> builder = ImmutableMap.builder();
ListeningScheduledExecutorService executorService = MoreExecutors.listeningDecorator(
Executors.newScheduledThreadPool(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use Execs.multiThreaded()

private void startLookups(List<LookupBean> lookupBeanList)
{
ImmutableMap.Builder<String, LookupExtractorFactoryContainer> builder = ImmutableMap.builder();
ListeningScheduledExecutorService executorService = MoreExecutors.listeningDecorator(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please rewrite it using standard Java API, ExecutorCompletionService?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that ScheduledExecutorService is not required here. I have changed it to ListeningExecutorService. Is this sufficient or do I still need to refactor it to use ExecutorCompletionService?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to use ExecutorCompletionService because it's standard Java API, which replaces Guava functionality that you used, that means that this functionality has a good chance of being deprecated and removed from Guava in the future, adding compatibility problems.

Because of this dumb Guava's compatibility policy (remove things), we better use Guava as little as possible.

leventov added a commit to metamx/druid that referenced this pull request Oct 4, 2017
Squashed commit of the following:

commit 24eb9fb
Merge: eae35a5 07aa405
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Tue Oct 3 21:40:03 2017 -0500

    Merge branch 'master' of https://github.com/druid-io/druid into synclookups

commit eae35a5
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Tue Oct 3 21:39:59 2017 -0500

    Update LRM

commit 37ae0c9
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Tue Oct 3 11:46:38 2017 -0500

    Make executorservice local

commit fc9d53e
Merge: bff5b09 c19cd23
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Tue Oct 3 11:45:31 2017 -0500

    Merge branch 'master' of https://github.com/druid-io/druid into synclookups

commit bff5b09
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Mon Oct 2 21:37:10 2017 -0500

    Add tests to LookupConfig

commit 1cb6627
Merge: 2ea72af 6f91d9c
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Mon Oct 2 19:45:07 2017 -0500

    Merge branch 'master' of https://github.com/druid-io/druid into synclookups

commit 2ea72af
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Mon Oct 2 08:57:58 2017 -0500

    Revert Lookupconfig constructor changes

commit c7ecfdc
Merge: 9876ae0 1f2074c
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Thu Sep 28 17:14:08 2017 -0500

    Merge branch 'master' of https://github.com/druid-io/druid into synclookups

commit 9876ae0
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Thu Sep 28 17:14:02 2017 -0500

    Remove lookup config constructor

commit 99c6f60
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Thu Sep 28 11:50:53 2017 -0500

    Refactoring and doc changes

commit ce32e04
Merge: 361d255 2c30d5b
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Thu Sep 28 08:14:39 2017 -0500

    Merge branch 'master' of https://github.com/druid-io/druid into synclookups

commit 361d255
Merge: e5fe3c2 c3fbe51
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Thu Sep 28 08:14:35 2017 -0500

    Doc changes

commit e5fe3c2
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Wed Sep 20 21:36:16 2017 -0500

    Update LookupConfig

commit 6d9278f
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Wed Sep 20 21:33:31 2017 -0500

    Move executorservice shutdown to finally block

commit 263dfdc
Merge: 727a706 a36adc6
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Wed Sep 20 20:54:04 2017 -0500

    Merge branch 'master' of https://github.com/druid-io/druid into synclookups

commit 727a706
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Fri Sep 15 08:59:12 2017 -0500

    Rename flag

commit 1db8914
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Fri Sep 15 08:37:33 2017 -0500

    Update docs

commit 756098b
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Thu Sep 14 22:45:57 2017 -0500

    Make disablelookups flag true by default

commit e9b3ddc
Merge: f3ad262 d37be5e
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Thu Sep 14 21:43:28 2017 -0500

    Merge branch 'master' of https://github.com/druid-io/druid into synclookups

commit f3ad262
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Wed Sep 13 16:01:00 2017 -0500

    Wait before thread shutdown

commit 4a11a49
Merge: 6731438 4f6eb47
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Wed Sep 13 15:32:19 2017 -0500

    Merge branch 'master' of https://github.com/druid-io/druid into synclookups

commit 6731438
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Wed Sep 13 11:15:01 2017 -0500

    Change coordinator instance to be retrieved by DruidLeaderClient

commit a3bfc31
Merge: 6eaffd4 587f180
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Wed Sep 13 10:52:39 2017 -0500

    Merge branch 'master' of https://github.com/druid-io/druid into synclookups

commit 6eaffd4
Merge: 317e4e3 3a29521
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Tue Sep 12 16:13:32 2017 -0500

    Merge branch 'master' of https://github.com/druid-io/druid into synclookups

commit 317e4e3
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Wed Sep 6 20:56:53 2017 -0500

    Minor refactors and doc update

commit 8f2a360
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Wed Sep 6 15:58:35 2017 -0500

    Refactor of Lookup classes

commit 5d41b29
Author: Atul Mohan <atulmohan@yahoo-inc.com>
Date:   Wed Sep 6 15:12:08 2017 -0500

    Changes for lookup synchronization
"LookupReferencesManager-Startup-%s"
);
ExecutorCompletionService completionService = new ExecutorCompletionService(executorService);
try {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leventov ExecutorCompletionService is being used in the latest commit. Please review.

builder.put(lookupResult);
}
}
catch (InterruptedException | ExecutionException e) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On InterruptedException, it should fall through to the catch (Exception e) { block below. And cancel all the remaining futures.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have cancelled the remaining futures on ExecutionException.

}
);
}
IntStream.range(0, lookupBeanList.size())
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please just use loop, also it will make easier to fall thorough with InterruptedException below.

catch (Exception e) {
LOG.error(e, "Failed to finish lookup load process.");
for (Future future : futures) {
if (!future.isDone()) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isDone not needed -- see example in ExecutorCompletionService's javadoc.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

}
private List<LookupBean> getLookupsListFromLookupConfig()
{
List<LookupBean> lookupBeanList = new ArrayList<>();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pointless assignment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

// Lookup loading is performed via snapshot if older version is present.
// This check is only for backward compatibility and should be removed in a future release
if (response.getContent().startsWith("[")) {
return null;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add logging message here, this is a very confusing situation in practice, is just says "coordinator not available"

@leventov
Copy link
Copy Markdown
Member

@a2l007 could you please finish this?

@a2l007
Copy link
Copy Markdown
Contributor Author

a2l007 commented Oct 12, 2017

@leventov I have updated the logging as requested. Do you have any pending feedback?

@leventov
Copy link
Copy Markdown
Member

@himanshug @drcrallen more comments?

@himanshug
Copy link
Copy Markdown
Contributor

LGTM

@himanshug himanshug merged commit c07678b into apache:master Oct 13, 2017
@a2l007 a2l007 deleted the synclookups branch October 16, 2017 13:53
@a2l007 a2l007 restored the synclookups branch October 17, 2017 17:25
@gianm gianm mentioned this pull request Nov 2, 2017
@jon-wei jon-wei added this to the 0.11.0 milestone Nov 16, 2017
leventov added a commit to metamx/druid that referenced this pull request Jun 3, 2018
…mit was one in the series of attempts to fix the problem with QTL startup, that was ultimately fixed by apache#4758. So this commit is probably not needed anymore, but I’m not 100% sure. In our Druid 0.11.0 deployments, this commit was definitely harmless, so not risking and moving this commit to `0.12.1-mmx` for the sake of stability.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants