Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@
import org.joda.time.Duration;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -58,7 +57,7 @@ public class CoordinatorRuleManager
private final ObjectMapper jsonMapper;
private final Supplier<TieredBrokerConfig> config;

private final AtomicReference<ConcurrentHashMap<String, List<Rule>>> rules;
private final AtomicReference<Map<String, List<Rule>>> rules;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be an AtomicReference of an ImmutableMap otherwise we can not guarantee that no one updates the map after it is set here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. The value type (List<Rule>) should be immutable as well. I'm ok with using UnmodifiableMap and UnmodifiableList.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 - I missed that.

Perhaps the best approach here is to keep the definition of the variable to AtomicReference<Map<String, List<Rule>>>

And add 2 unit tests to ensure
1 - we can not add to/ remove from the map
2 - we can not mutate the rules for an entry in a map.

This way if someone changes this behavior in the future we can catch it easily, and changing the type of Map/ List is easy as well.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, I didn't mean the value type of rules should be UnmodifiableList<Rule>.

@suneet-s do you regard your comment as a blocker for this PR? Those tests would be definitely nice to have, but since any tests haven't rewritten for this class, I'm fine with adding all necessary tests including the ones you mentioned in a follow-up PR at once. @zhenxiao you can do in this PR if you want though.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jihoonson I think we need some way to verify that this change works and will not break in the future.

I understand it's not any worse than before, but I don't think we'll ever come back to add tests for this until we hit a bug - at which point it will be really hard to debug. It should be relatively quick to add those 2 tests so I think it's worth it. If there's a fast follow, that should be ok.

I'll leave it to you for final decision on whether this should be a blocker or not (I'm not sure what the apache rules are), but I think debugging this if someone changes the code to mutate the map would be very tough - I'd strongly recommend we add tests here to prevent that pain in the future.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhenxiao would you consider @suneet-s's comment above? I think it would be nice to have. If you want to add them in this PR, I think you may need to create a new class, CoordinatorRuleManagerTest, and expose the rules via a method annotated with @VisibleForTesting to verify that the map inside the AtomicRefence is immutable. Another option could be creating an issue for adding those tests and doing in a follow-up PR.

@zhenxiao @suneet-s What do you think?

Copy link
Copy Markdown
Contributor

@suneet-s suneet-s Feb 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jihoonson I'm not so familiar with this area of the code, so I'm happy to defer to your judgement. I like the tests you've described and think it's ok if tests come as a fast follow

@zhenxiao can you describe what manual testing you've done to verify this change? I'm sorry if this is coming across as nagging, I've just experienced a lot of pain trying to triage concurrency issues in the past.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@suneet-s what kind of manual testings you think need to be done? The change in this PR looks pretty straightforward and I don't think this PR would lead to any sort of concurrency issues at least for now.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your comments @suneet-s @jihoonson
updated to unmodifiableMap
I will file an issue to add tests after this PR is merged

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I raised #9318.


private final DruidLeaderClient druidLeaderClient;

Expand All @@ -80,7 +79,7 @@ public CoordinatorRuleManager(
this.druidLeaderClient = druidLeaderClient;

this.rules = new AtomicReference<>(
new ConcurrentHashMap<String, List<Rule>>()
Collections.emptyMap()
);
}

Expand Down Expand Up @@ -120,7 +119,7 @@ public void stop()
return;
}

rules.set(new ConcurrentHashMap<String, List<Rule>>());
rules.set(Collections.emptyMap());

started = false;
exec.shutdownNow();
Expand Down Expand Up @@ -149,17 +148,13 @@ public void poll()
);
}

ConcurrentHashMap<String, List<Rule>> newRules = new ConcurrentHashMap<>(
(Map<String, List<Rule>>) jsonMapper.readValue(
rules.set(
Collections.unmodifiableMap(jsonMapper.readValue(
response.getContent(), new TypeReference<Map<String, List<Rule>>>()
{
}
)
))
);

log.debug("Got [%,d] rules", newRules.size());

rules.set(newRules);
}
catch (Exception e) {
log.error(e, "Exception while polling for rules");
Expand All @@ -169,7 +164,7 @@ public void poll()
public List<Rule> getRulesWithDefault(final String dataSource)
{
List<Rule> rulesWithDefault = new ArrayList<>();
ConcurrentMap<String, List<Rule>> theRules = rules.get();
Map<String, List<Rule>> theRules = rules.get();
List<Rule> dataSourceRules = theRules.get(dataSource);
if (dataSourceRules != null) {
rulesWithDefault.addAll(dataSourceRules);
Expand Down