Skip to content

Broadcast join support based on broadcast rule #9953

@jihoonson

Description

@jihoonson

Motivation

Druid now supports real joins (#8728). Even though this introduced a lot of opportunities to expand the capability of Druid, it is in practice pretty limited yet since we support only joins of a Druid datasource to Lookups or inline datasources. This proposal is to support the broadcast join using the broadcast rule (#4077). The goals of the proposed design are:

  • Easy to update configuration: Druid operators should be able to easily broadcast/unbroadcast datasources.
  • Support joins to stream: The coordinator should be able to broadcast datasources to realtime tasks.
  • Memory-efficient handling of giant broadcast datasources: The broadcast datasource can be large. Druid should be able to handle them efficiently.

Proposed changes

The idea is pre-broadcasting a datasource in every node and using it at query time. To do so, a datasource can be created using the regular ingestion spec and indexing service, and then be broadcasted by setting a BroadcastDistributionRule via the coordinator API. The entire datasource (its all segments) will be broadcasted all together to every historical, task (or indexer), and broker. A broadcasted datasource can be switched into a regular datasource by setting a loadRule via the coordinator API. The coordinator will be responsible for broadcasting datasources similar to that it distributes regular datasources based on loadRule.

The BroadcastDistributionRule can be set for any datasource. However, for faster hash join, we can create a pre-built hash table at ingestion time and load it in query nodes. At query time, the join query engine will check whether there is a pre-built hash table or not. If it's there, the engine can use it. Otherwise, it can create one on the fly.

The below subsections desrcibe what should be done to implement the broadcast join with the pre-built hash table.

Pre-built hash table

The pre-built hash table is created by the index task. Since the broadcasted datasource can be large, the query node (historicals, tasks, indexers, and brokers) should be able to read them using mmap as for regular segments to avoid heavy GC overhead.

The format of the hash table is not determined yet. I will update this proposal or write another proposal for it. Whatever the new format would be, a new SegmentizerFactory will be added to handle it properly.

The index task will create pre-built hash tables

To let the task know what kind of segments/indexes it needs to create, we need a new configuration in the ingestion spec. However, I'm not 100% sure what a nice configuration would be for it yet. One of the reasons is the undetermined hash table format. I think the new configuration might be different depending on what we want to do with the hash table format. For example, we need at least the list of columns to build the hash table which should be specified via the user configuration. I'm not sure yet what else we need.

As a result, I would like to suggest to add SegmentizerFactory in the tuningConfig as a meantime solution. The segmentizerFactory is already an extendable interface and JSON-serializable. By doing this, we can test out various segment/index format for hash join. After the dust settles down and we find the best format, we can deprecate the SegmentizerFactory and add a better configuration in the ingestion spec. An example would be

     "tuningConfig" : {
       "type" : "index_parallel",
       "segmentizerFactory": {
           "type": "hashIndexedTable",
           "keyColumns": ["columns", "to", "create", "hash table"]
       },  
       ...
    }

If the SegmentizerFactory is missing in the ingestion spec, the MMappedQueryableSegmentizerFactory will be used by default. The index task will write the SegmentizerFactory in the factory.json file when it writes the segment file. See the below "SegmentizerFactory for loading data and pre-built hash table" section for how to read the segment/hashtable file.

For broadcast datasources, it is not allowed to append more data. That means, the whole datasource should be broadcasted all together.

Modifying BroadcastDistributionRule

The BroadcastDistributionRule has a field of colocatedDatasources. This field was for users to specify with what datasources the broadcasted datasource will be colocated. This causes a bug in broadcasting, that is, the broadcasted segment should be moved with the colocated segment atomically which it doesn't do it now. To fix this issue, I think it's better to remove this field because a regular datasource is usually distributed to all historicals if it's large enough and so the colocatedDatasources doesn't seem very useful anyway. Instead, we can add tiers similar to loadRules.

I believe this doesn't cause any compatibility issue as it is not practically in use in any production. I'm pretty sure about it because 1) we didn't have a proper use case for it before, i.e. joins, and 2) it is only partially implemented and doesn't fully work.

Assigning and balancing segments

The coordinator should assign all segments of a broadcast datasource to all historicals, realtime tasks, brokers, and indexers. When assigning segments, replication throttling shouldn't apply for broadcasting. Also, the coordinator should skip balancing for broadcasted datasources.

Segment loading in brokers, realtime tasks, and indexers

All brokers, realtime tasks, and indexers will load assigned segments. Both ZK-based and HTTP-based segment loading should be supported.

SegmentizerFactory for loading data and pre-built hash table

Once a segment file is downloaded in a historical, a task, an indexer, or a broker, the file should be loaded properly including the pre-built hash table. The node should create the right type of SegmentizerFactory from the factory.json file created by the index task. The new SegmentizerFactory should know details of how to load data and the hash table. Once the SegmentizerFactory finishes loading, the node announces the loaded segment.

Querying broadcasted datasources

All broadcast datasources will be registered on the index schema. When you want to join using SQL, you can do like below:

SELECT count(*)
FROM druid.t1, index.t2
WHERE t1.col = t2.col

For joins to broadcast datasources, the broker will find the most recent broadcast segments which are available in all nodes where it wants to send the query. If there is no such broadcast segment, the query will fail. Once it finds such broadcast segments, it will send the join query to all nodes which have the broadcast segments and collect the result from them. Note that the join of a broadcast datasource and another broadcast datasource will not be supported in this proposal.

For directly querying on the broadcast datasources, the broker should pick up only one node among historicals and brokers including itself and send the query to it.

Rationale

  • Expanding the Lookup APIs to support wide tables. This idea was dropped because the Lookup API is tightly coupled with its data KV data model. It also has its own way to load data which is pretty limited compared to the regular ingestion tasks.

Operational impact

As explained in the "Modifying BroadcastDistributionRule" section, the colocatedDatasources will be removed from BroadcastDistributionRule. However, I don't think this will cause any compatibility issue.

Future work

  • Tier support for broadcast rule
  • Stabilizing the Indexer. Since the broadcast datasource will be replicated per task with the middleManager, it can occupy a huge amount of memory. It is more realistic to use the Indexer for broadcast joins to stream.
  • Support joins of a broadcast datasource to another broadcast datasource.
  • Broadcast rule and lookup segments. The lookup API can be replaced with the combination of the broadcast rule and lookup segments.
  • Broadcasting only a part of a datasource. Imagining that you may want to broadcast only the segments of some recent time period since they are frequently joined. In this case, allowing to broadcast only a part of a datasource would be useful. We can similarly allow appending data to broadcast datasources.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions