Skip to content

projections + compaction + catalog + msq#17803

Merged
clintropolis merged 13 commits intoapache:masterfrom
clintropolis:projections-in-the-catalog
Apr 1, 2025
Merged

projections + compaction + catalog + msq#17803
clintropolis merged 13 commits intoapache:masterfrom
clintropolis:projections-in-the-catalog

Conversation

@clintropolis
Copy link
Copy Markdown
Member

@clintropolis clintropolis commented Mar 14, 2025

Description

changes:

  • CompactionTask now accepts a projections property which will cause classic and MSQ auto-compaction to build segments with projections
  • DataSourceCompactionConfig has been turned into an interface, with the existing implementation renamed to InlineSchemaDataSourceCompactionConfig
  • Added projections list to InlineSchemaDataSourceCompactionConfig to allow explicitly defining projections in an inline schema compaction spec
  • if not explicitly defined, compaction tasks will now preserve existing projections when processing segments, combining all named projections across the segments being processed - different projections with the same name are not checked for equivalence, rather one will be chosen dependent on segment processing order.
  • Added ability to define projections as a property of a datasource in the catalog
  • If projections are defined in a catalog, they will be automatically used by MSQ insert and replace queries
  • Added new experimental CatalogDataSourceCompactionConfig which allows populating much of a CompactionTask using information stored in the catalog. Currently this has some feature gaps compared to InlineSchemaDataSourceCompactionConfig, but will be improved in follow-up work to eventually become much more powerful than what can be expressed via a InlineSchemaDataSourceCompactionConfig
  • Moved MetadataCatalog to druid-server from the catalog extension
  • Added method to get MetadataCatalog from CatalogResolver
  • Added CatalogCoreModule to provide a null binding for MetadataCatalog, overridden if the catalog extension is loaded
  • Overlord added as a watcher for catalog like the Broker so that it can have CatalogResolver and MetadataCatalog available
  • Added binding for MetadataCatalog to Coordinator to have MetadataCatalg available
  • CatalogUpdateNotifier now periodically resyncs the catalog on clients, and retries resync failures, fixing an issue if a catalog client is started and the coordinator is not running
  • added CatalogClientConfig to control polling behavior for resyncs of catalog clients, similar to basic-auth cache
  • added ExcludeScope annotation so that CatalogClientModule can be skipped for coordinator node roles (when operating in combined coordinator overlord mode both roles are present, causing guice binding errors from attempted duplicate bindings).

API Examples

I used intellij rest client to run these, but should work with curl or whatever else too

list catalog tables (should be empty if un-initialized)

GET http://localhost:8888/druid/coordinator/v1/catalog/schemas/druid/tables
Accept: application/json
User-Agent: IntelliJ HTTP Client/IntelliJ IDEA 2024.3
Accept-Encoding: br, deflate, gzip, x-gzip

create projection for table wiki-projections-catalog

POST http://localhost:8888/druid/coordinator/v1/catalog/schemas/druid/tables/wiki-projections-catalog
User-Agent: IntelliJ HTTP Client/IntelliJ IDEA 2024.3
Content-Type: application/json
{
  "type": "datasource",
  "columns": [],
  "properties": {
    "segmentGranularity": "PT1H",
    "projections": [
      {
        "spec": {
          "name": "channel_page_hourly_distinct_user_added_deleted",
          "type": "aggregate",
          "virtualColumns": [
            {
              "type": "expression",
              "name": "__gran",
              "expression": "timestamp_floor(__time, 'PT1H')",
              "outputType": "LONG"
            }
          ],
          "groupingColumns": [
            {
              "type": "long",
              "name": "__gran",
              "multiValueHandling": "SORTED_ARRAY",
              "createBitmapIndex": false
            },
            {
              "type": "string",
              "name": "channel",
              "multiValueHandling": "SORTED_ARRAY",
              "createBitmapIndex": true
            },
            {
              "type": "string",
              "name": "page",
              "multiValueHandling": "SORTED_ARRAY",
              "createBitmapIndex": true
            }
          ],
          "aggregators": [
            {
              "type": "HLLSketchBuild",
              "name": "distinct_users",
              "fieldName": "user",
              "lgK": 12,
              "tgtHllType": "HLL_4"
            },
            {
              "type": "longSum",
              "name": "sum_added",
              "fieldName": "added"
            },
            {
              "type": "longSum",
              "name": "sum_deleted",
              "fieldName": "deleted"
            }
          ]
        }
      }
    ]
  }
}

get spec of catalog table

GET http://localhost:8888//druid/coordinator/v1/catalog/schemas/druid/tables/wiki-projections-catalog
Accept: application/json
User-Agent: IntelliJ HTTP Client/IntelliJ IDEA 2024.3
Accept-Encoding: br, deflate, gzip, x-gzip

update catalog table projections to add a projection (version is updated field from 'get' response)

POST http://localhost:8888/druid/coordinator/v1/catalog/schemas/druid/tables/wiki-projections-catalog?version=1740627900178
User-Agent: IntelliJ HTTP Client/IntelliJ IDEA 2024.3
Content-Type: application/json
{
  "type": "datasource",
  "columns": [],
  "properties": {
    "segmentGranularity": "PT1H",
    "projections": [
      {
        "spec": {
          "name": "channel_page_hourly_distinct_user_added_deleted",
          "type": "aggregate",
          "virtualColumns": [
            {
              "type": "expression",
              "name": "__gran",
              "expression": "timestamp_floor(__time, 'PT1H')",
              "outputType": "LONG"
            }
          ],
          "groupingColumns": [
            {
              "type": "long",
              "name": "__gran",
              "multiValueHandling": "SORTED_ARRAY",
              "createBitmapIndex": false
            },
            {
              "type": "string",
              "name": "channel",
              "multiValueHandling": "SORTED_ARRAY",
              "createBitmapIndex": true
            },
            {
              "type": "string",
              "name": "page",
              "multiValueHandling": "SORTED_ARRAY",
              "createBitmapIndex": true
            }
          ],
          "aggregators": [
            {
              "type": "HLLSketchBuild",
              "name": "distinct_users",
              "fieldName": "user",
              "lgK": 12,
              "tgtHllType": "HLL_4"
            },
            {
              "type": "longSum",
              "name": "sum_added",
              "fieldName": "added"
            },
            {
              "type": "longSum",
              "name": "sum_deleted",
              "fieldName": "deleted"
            }
          ]
        }
      },
      {
        "spec": {
          "name": "channel_minute_added",
          "type": "aggregate",
          "virtualColumns": [
            {
              "type": "expression",
              "name": "__gran",
              "expression": "timestamp_floor(__time, 'PT1M')",
              "outputType": "LONG"
            }
          ],
          "groupingColumns": [
            {
              "type": "long",
              "name": "__gran",
              "multiValueHandling": "SORTED_ARRAY",
              "createBitmapIndex": false
            },
            {
              "type": "string",
              "name": "channel",
              "multiValueHandling": "SORTED_ARRAY",
              "createBitmapIndex": true
            }
          ],
          "aggregators": [
            {
              "type": "longSum",
              "name": "sum_added",
              "fieldName": "added"
            }
          ]
        }
      }
    ]
  }
}

for coordinator based compaction

POST http://localhost:8081/druid/coordinator/v1/config/compaction
User-Agent: IntelliJ HTTP Client/IntelliJ IDEA 2024.3
Content-Type: application/json
{
    "type": "catalog",
    "dataSource": "wiki-projections-catalog",
    "engine": "native",
    "skipOffsetFromLatest": "PT0H",
    "taskPriority": 25,
    "inputSegmentSizeBytes": 100000000000000,
    "taskContext": null
  }

(alternative) create autocompaction supervisor if using compcation supervisors instead of coordinator compaction

POST http://localhost:8888/druid/indexer/v1/supervisor
User-Agent: IntelliJ HTTP Client/IntelliJ IDEA 2024.3
Content-Type: application/json
{
  "type": "autocompact",
  "spec": {
    "type": "catalog",
    "dataSource": "wiki-projections-catalog",
    "engine": "native",
    "skipOffsetFromLatest": "PT0H",
    "taskPriority": 25,
    "inputSegmentSizeBytes": 100000000000000,
    "taskContext": null
  },
  "suspended": true
}

Release note

todo


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • been tested in a test Druid cluster.

changes:
* `CompactionTask` now accepts a `projections` property which will cause classic and MSQ auto-compaction to build segments with projections
* `DataSourceCompactionConfig` has been turned into an interface, with the existing implementation renamed to `InlineSchemaDataSourceCompactionConfig`
* Added projections list to `InlineSchemaDataSourceCompactionConfig` to allow explicitly defining projections in an inline schema compaction spec
* if not explicitly defined, compaction tasks will now preserve existing projections when processing segments, combining all named projections across the segments being processed - different projections with the same name are not checked for equivalence, rather one will be chosen dependent on segment processing order.
* Added ability to define projections as a property of a datasource in the catalog
* If projections are defined in a catalog, they will be automatically used by MSQ insert and replace queries
* Added new experimental `CatalogDataSourceCompactionConfig` which allows populating much of a `CompactionTask` using information stored in the catalog. Currently this has some feature gaps compared to `InlineSchemaDataSourceCompactionConfig`, but will be improved in follow-up work to eventually become much more powerful than what can be expressed via a `InlineSchemaDataSourceCompactionConfig`
* Moved `MetadataCatalog` to druid-server from the catalog extension
* Added method to get `MetadataCatalog` from `CatalogResolver`
* Added `CatalogCoreModule` to provide a null binding for `MetadataCatalog`, overridden if the catalog extension is loaded
* Overlord added as a watcher for catalog like the Broker so that it can have `CatalogResolver` and `MetadataCatalog` available
* Added binding for `MetadataCatalog` to Coordinator to have `MetadataCatalg` available
Comment thread server/src/test/java/org/apache/druid/server/compaction/CompactionStatusTest.java Dismissed
@clintropolis clintropolis added this to the 33.0.0 milestone Mar 27, 2025
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation seems off here

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, 👍

fieldMapping.stream().map(Entry::getValue).collect(Collectors.toSet())
);

final List<AggregateProjectionSpec> projectionSpecs;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

break this out into its own method please; the current function is long enough as it is.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, also split out some other methods because yeah this was a lot

}


private void processProjections(final QueryableIndex index)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some comments here about the logic would be useful, especially how conflicts are handled between projections of different names.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added.

This got me thinking a bit though about what behavior would be better between using the first encountered vs the last encountered? Should it warn if there is a mismatch? Right now it is using last encountered, but i didn't give it a lot of thought, since the main goal for this discovery based stuff was for compaction to not automatically wipe out projections by default. My preference is that this will all be driven via the catalog instead of relying on discovery from segments, but this at least covers that case where the setup was not done (or not explicitly specified in the inline schema compaction spec).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it matters much. I don't think there would be a serious consequence to picking the "wrong" projection when there is a conflict. I think the current logic is ok, I just wanted to see a description of it.

return granularitySpec;
}

@JsonProperty
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be nullable? Are null and empty meaningfully different & can both happen?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not meaningfully different in the sense that neither will result in projections being built into the segment. I modified to coerce to empty so that they are considered equivalent for comparison.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait, i think the modification wasn't the right thing to do since it will not compare correctly, reverting that and marking nullable for now.

return spec.properties();
}

public <T> T getProperty(String key)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Nullable? Also, the fact that decoding is automatic seems interesting and should be mentioned in the javadoc. Otherwise it's not clear why getProperty is different from calling properties() and doing a get.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair, renamed to decodeProperty which i think resolves the problem

}
}

public static boolean isDatasource(String tableType)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have javadoc about what tableType is meant to be. Like, what kinds of types can be put in here? Perhaps it's obvious to people that are more familiar with the catalog than I am. But, to me, it's not obvious.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i just shuffled some stuff around, this isn't a new method, but agree it could use some javadocs. This method only seems to be used by TableEditor, and also is always inverted (it is validating that catalog hidden column modifications are only applied to 'datasource' typed specs. I'll just add a note for now that the expected strings arguments are from TableSpec.type, but it does seem like this could be reworked to do this differently (same with the instanceOf version right below it)

return context;
}

@JsonProperty("projections")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be null, so how about adding @Nullable and also json-include only if nonnull?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@LoadScope(roles = NodeRole.BROKER_JSON_NAME)
public class CatalogBrokerModule implements DruidModule
@LoadScope(roles = {NodeRole.BROKER_JSON_NAME, NodeRole.OVERLORD_JSON_NAME})
@ExcludeScope(roles = {NodeRole.COORDINATOR_JSON_NAME})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need both @LoadScope and @ExcludeScope? Isn't not listing COORDINATOR_JSON_NAME in @LoadScope enough to not load it there?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that we need it because there's a conflict between this module and CatalogCoordinatorModule. Please add a javadoc reference to CatalogCoordinatorModule pointing out that the exclusion is needed because we can't load both in the same process.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added javadocs to link module and updated LoadScope/ExcludeScope javadoc to indicate that ExcludeScope takes priority if both are defined

import java.util.Objects;

public class DataSourceCompactionConfig
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = InlineSchemaDataSourceCompactionConfig.class)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the defaultImpl for backwards-compat reasons?

We should avoid it whenever possible, because it does this weird thing where if you provide an invalid type, rather than being an error, it's mapped to the defaultImpl. It tends to burn people that mistype things, forget to load extensions, etc.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, it is for backwards compatibility. Looking into a custom serde to see if we can better restrict it to only allow coercion where type is missing instead of cases where it is incorrect.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be ideal, although it doesn't necessarily need to be done in this PR. If you do a custom serde it would be nice if it was generic enough to apply to other interfaces. (I would love to strike defaultImpl from the codebase.)

Copy link
Copy Markdown
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@clintropolis clintropolis merged commit 9026200 into apache:master Apr 1, 2025
76 checks passed
@clintropolis clintropolis deleted the projections-in-the-catalog branch April 1, 2025 19:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants