Skip to content

Add BroadcastDistributionRule#4077

Merged
fjy merged 9 commits intoapache:masterfrom
jihoonson:broadcast
May 1, 2017
Merged

Add BroadcastDistributionRule#4077
fjy merged 9 commits intoapache:masterfrom
jihoonson:broadcast

Conversation

@jihoonson
Copy link
Copy Markdown
Contributor

@jihoonson jihoonson commented Mar 17, 2017

First patch for #4032.


This change is Reviewable

@fjy fjy added this to the 0.10.1 milestone Mar 20, 2017
@jon-wei jon-wei self-assigned this Mar 22, 2017
Copy link
Copy Markdown
Contributor

@jon-wei jon-wei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had some minor comments, otherwise LGTM

"Failed to replicate segment[%s] to server[%s] due to insufficient available space",
segment.getIdentifier(),
holder.getServer().getHost()
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the formatting off here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No.. This is druid's format if i didn't make a mistake. Maybe better if i use a separate string value for the error message like below?

String message = String.format(...);
log.makeAlert(message)
    .addData("segmentSize", segment.getSize())
    .addData("availableSize", holder.getAvailableSize())
    .emit();

{
static final String TYPE = "broadcastForever";

private final String colocateDataSource;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest renaming this here and elsewhere to colocatedDatasource

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Done.

Copy link
Copy Markdown

@weijietong weijietong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WIP or not support for replicator ?

.addData("availableSize", holder.getAvailableSize())
.emit();
} else {
holder.getPeon().loadSegment(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here not found replicator support , or next submit ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean ReplicationThrottler?
ReplicationThrottler is used to throttle replication speed, and I think it's not necessary for broadcast rules because users usually want that broadcast completes as soon as possible.

@gianm gianm added the Feature label Mar 24, 2017
Copy link
Copy Markdown
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally lgtm, just a few comments.

@jihoonson if you think the load rule API here is going to be stable as you continue working, please document it where other load rules are documented. Otherwise I'd say you can leave it undocumented until it's stable.

@JsonProperty("colocatedDatasource") String colocatedDatasource
)
{
this.colocatedDatasource = Objects.requireNonNull(colocatedDatasource);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most other Druid code uses Preconditions.checkNotNull but I guess there's no real need to be consistent there.

// Find servers which holds the segments of co-located data source
final List<ServerHolder> targetServerHolders = params
.getDruidCluster().getAllServers().stream()
.filter(eachHolder -> eachHolder.getServer().getDataSource(getColocatedDatasource()) != null)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me like it'd be valuable to have colocatedDataSources be a list, and maybe if it's empty then broadcast to all servers. That'd give some more flexibilty to the user about where to broadcast their data sources.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Sounds nice.

stats.addToGlobalStat(LoadRule.ASSIGNED_COUNT, 0);

for (ServerHolder holder : serverHolderList) {
if (segment.getSize() > holder.getAvailableSize()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some sites use the main alert text to collect and summarize similar alerts. So it'd be nicer for this description to be ("Failed to broadcast segment for[%s]", dataSource) with the segment id and specific server as additional data.

return stats;
}

public abstract String getColocatedDatasource();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please spell this as getColocatedDataSource (or dataSources) not datasource.

}

@Override
public boolean equals(Object o)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have hashCode overridden too so it is consistent with equals.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please override toString too, I think there's some code that prints rules.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you give me some examples where toString is used? It's not overridden for other rules as well.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, nevermind, you're right.

}

@Override
public boolean equals(Object o)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Override hashCode too

@weijietong
Copy link
Copy Markdown

@jihoonson I have read the code of this part PR . The 'BroadcastDistributionRule' is a really good way to express DataSource distribution for the join target .

But another thing needs to be care about is DruidCoordinatorBalancer. As I ever mentioned , Druid now hard code using CostBalancerStrategy . When the DruidCoordinatorBalancer runs periodically, it may breaks the DataSource distribution specified by the broadcast or LocalJoin load rule.

My solution is to rewrite the CostBalancerStrategy to use my specific BalancerStrategy first to pick a segment to move . Then check the segment DataSource again to enforce it's movement not to break the distribution rule. A little indirect but not extensible . I wonder whether you have thought about this and anyone else has a good solution ?

@jihoonson
Copy link
Copy Markdown
Contributor Author

@gianm thanks. Addressed your comments. I'll document soon.
@weijietong I haven't thought about that yet, but your idea sounds to make sense to me. I'll think about that too.

@jihoonson
Copy link
Copy Markdown
Contributor Author

@gianm documented broadcast rules.

@jihoonson
Copy link
Copy Markdown
Contributor Author

The travis failure seems not related to this patch. @jon-wei, would you restart the build? I don't have permission.

@jihoonson
Copy link
Copy Markdown
Contributor Author

I updated the patch to drop segments which are not co-located anymore.


final CoordinatorStats stats = new CoordinatorStats();

return stats.accumulate(assign(loadServerHolders, segment))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im wondering if the broadcast rules should label their segments a little differently, or at least be able to distinguish from actual Druid segments

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll definitely make reading the logs easier

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think broadcasted segments should be regarded as same as other segments. Every types of queries can be queried on broadcasted segments, but the broadcast information is required by only joins.

The information of which sources are broadcasted or not is required by brokers and historicals (and realtimes in the future). Brokers decides a given query should be distributed to which nodes, and this is determined by selecting the nodes holding the segments of non-broadcasted tables. Historicals, as proposed in #4032, join broadcasted segments first and then subsequently join the result with non-broadcasted segments in parallel.

I'm currently thinking that brokers are able to figure out which tables are broadcasted by looking at BroadcastDistributionRules, and they will add this information to QueryContext for historicals. If this works, broadcast segments can be regarded as normal.

What do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jihoonson Okay that makes sense to me. Can broadcast segments be created dynamically?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe possible in the future when we support the feature of broadcasting tables on the fly, but not yet.

@jihoonson
Copy link
Copy Markdown
Contributor Author

I found two problems when this patch is applied.

  1. Segments of broadcast data sources might not be fully co-located with segments of colocatedDataSources if load/drop rules for colocatedDataSources are run before BroadcastDistributionRule.
  2. Since DruidCoordinatorBalancer is run after DruidCoordinatorRuleRunner, DruidCoordinatorBalancer breaks the co-location of broadcast segments and colocatedDataSources' segments.

The solution I'm currently considering is as follows.

  1. Building a dependency graph of rules, and run rules from independent ones to dependent ones. There must be no cycle in the dependency graph. This solution changes Druid's rule application policy, 'first matched rule is applied', so we need more discussion.
  2. Changing the unit for load balancing from a segment to a set of co-located segments. The cost for moving co-located segments will be definitely large, so usually independent segments will be chosen for load balancing.

Since BroadcastDistributionRules are just introduced here, and not used in anywhere, I think it would be ok to fix these problems in follow-up PRs.

@fjy
Copy link
Copy Markdown
Contributor

fjy commented Apr 5, 2017

👍

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Apr 12, 2017

@jihoonson, on the two points you raised, it sounds like the issue is that (a) loading new segments, and (b) moving existing segments for balancing, could break broadcasting if a segment from the colocated datasource is moved onto a historical node that didn't already have any segments from that datasource.

I guess in practice this is potentially an issue, but won't be a huge one, since odds are that a distributed datasource will have at least one segment on all historical nodes in a given tier anyway (unless it has a number of segments smaller than the number of historical nodes).

To keep things (very) simple, what do you think about removing the "colocatedDataSources" concept, and instead broadcasting the broadcast datasources to every historical node the given tier for the rule? I think in practice this achieves mostly the same thing (given the reason above) as having colocatedDataSources. It also avoids bugs due to this effect, since every historical in a tier will have the broadcast segments and there's no need to worry.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Apr 12, 2017

Or at least, making that the behavior if colocatedDataSources is empty, so users can leave it empty and not have to worry.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Apr 12, 2017

The docs should mention this caveat.

@jihoonson
Copy link
Copy Markdown
Contributor Author

@gianm sounds good, but I'm concerned with the usability. If we support broadcasting to tiers instead of broadcasting to data sources, users will need two steps to design their data distribution. That is, they first need to check each data source to be joined is involved which tiers, and then decide which data sources need to be broadcasted to which tiers. Broadcasting to data sources will be simpler because users need to decide which sources are broadcasted to which source.
In addition, if load rules for a data source are changed, broadcast rules relating to that data source should be changed accordingly. This will be inconvenient.
What do you think?

@jihoonson
Copy link
Copy Markdown
Contributor Author

@gianm if colocatedDataSources is null or empty, segments are broadcasted to all nodes.

@jihoonson
Copy link
Copy Markdown
Contributor Author

Maybe we can support two different types of broadcast rules for 1) broadcasting to tiers and 2) colocating data sources.

@gianm
Copy link
Copy Markdown
Contributor

gianm commented Apr 13, 2017

The latter suggestion sounds good. I think in practice many users will choose to broadcast to tiers -- since if you're already very aware of your tier structure (as many users are, if they use tiers) then it should be pretty easy to choose the right tier for a broadcast. It also works around the issue you brought up, which is nice.

The colocated data source option is a nice alternative that would make configuration simpler, just should come with a caveat about the issue you brought up.

@jihoonson
Copy link
Copy Markdown
Contributor Author

Ok, then I'll make another PR for broadcasting to tiers.

@jihoonson
Copy link
Copy Markdown
Contributor Author

@gianm, I added a caveat


public Set<ServerHolder> getAllServers()
{
final Set<ServerHolder> servers = new HashSet<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this is a Set? Set seems suspicious to me (vs. List) for two reasons.

  • I think DruidCluster is meant to only have a single copy of each server anyway
  • ServerHolder equals includes a call to LoadQueuePeon.equals, which is based on reference equality, so it seems unreliable to use it to dedupe servers

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I misunderstood that the same server can be involved in different tiers which it can't. I changed to list.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gianm I found ServerHolder.equals() calls ImmutableDruidServer.equals() and LoadQueuePeon.equals() which are not implemented. Maybe this is not a critical problem for now because it is not called so far.. Anyway, I'll fix it.
Btw, to check the equality of two ServerHolders, I think it would be enough to compare ImmutableDruidServer.getHost(), ImmutableDruidServer.getType(), and ImmutableDruidServer.getTier(). What do you think?

The interval of a segment will be compared against the specified period. The period is from some time in the past to the current time. The rule matches if the period contains the interval.

<div class="note caution">
broadcastToDataSources rules don't guarantee that segments of the data sources are always co-located.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment would be better if it explained why. Maybe adding "because segments for the colocated data sources are not loaded together atomically".

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added. Thanks.


```json
{
"type" : "broadcastToDataSourcesForever",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should just be called broadcastForever, since the dataSources part is optional. And similar comment for the other kinds of broadcast rules.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed.

stats.addToGlobalStat(LoadRule.ASSIGNED_COUNT, 0);

for (ServerHolder holder : serverHolders) {
if (segment.getSize() > holder.getAvailableSize()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that like CostBalancerStrategy, this should check if holder.isLoadingSegment(segment) and skip if so.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I added it.

if (colocatedDataSources.stream()
.anyMatch(source -> eachHolder.getServer().getDataSource(source) != null)) {
loadServerHolders.add(eachHolder);
} else if (eachHolder.isServingSegment(segment)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should skip holders that already have the segment in their drop lists (peon.getSegmentsToDrop()), like DruidCoordinator.moveSegment and DruidCoordinatorCleanupUnneeded.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I added it.

Copy link
Copy Markdown
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @jihoonson!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants