Skip to content

Add policy enforcer to sanity check on policy in query execution#17774

Merged
gianm merged 111 commits intoapache:masterfrom
cecemei:policy
Apr 25, 2025
Merged

Add policy enforcer to sanity check on policy in query execution#17774
gianm merged 111 commits intoapache:masterfrom
cecemei:policy

Conversation

@cecemei
Copy link
Copy Markdown
Contributor

@cecemei cecemei commented Mar 3, 2025

Description

Added PolicyEnforcer interface, NoopPolicyEnforcer and RestrictAllTablesPolicyEnforcer. It'd be configurable by druid.policy.enforcer, through PolicyModule.

  • On query planning/execution on broker, PolicyEnforcer works in building DruidQuery and authorization step in QueryLifecycle, withPolicies(Map<String, Optional<Policy>> policyMap, PolicyEnforcer policyEnforcer). Specifically, enforcer.validateOrElseThrow(TableDataSource ds, Policy policy) throws an exception if validation fails.
  • On query runner/processor, PolicyEnforcer is passed in ExecutionVertex.createSegmentMapFunction(PolicyEnforcer), when the mapFn is called, it validates the mapped segment called from SegmentReference.ValidateOrElseThrow(PolicyEnforcer policyEnforcer). Specifically,
    • for HashJoinSegment and WrappedSegment, call delegate.validateOrElseThrow
    • for RestrictedSegment and ReferenceCountingSegment, call enforcer to validate with a ReferenceCountingSegment and policy (null for ReferenceCountingSegment not wrapped in RestrictedSegment.

As a singleton object, the enforcer is injected to QueryLifecycleFactory and TaskToolboxFactory, and made its way down to SinkQuerySegmentWalker, FrameContext and QueryLifecycle.

Additionally,

  • refactored ServerManagerTest to use Guice bindings (new test dependency), and TestSegmentCacheManager for loading segments.
  • added a MSQTaskQueryMakerTest class for easy binding of enforcer.

Key changed/added classes in this PR
  • PolicyEnforcer
  • NoopPolicyEnforcer (with test class too)
  • RestrictAllTablesPolicyEnforcer (with test class too)
  • PolicyModule (with test class too)
  • ServerManagerTest
  • MSQTaskQueryMakerTest

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@cecemei cecemei marked this pull request as ready for review March 5, 2025 06:14
@cryptoe cryptoe requested a review from clintropolis March 6, 2025 04:19
@cecemei cecemei changed the title Some policy config Add policy config to allow sanity-check on policy in data nodes Mar 6, 2025
Comment thread processing/src/main/java/org/apache/druid/query/policy/PolicyConfig.java Outdated
Comment thread processing/src/main/java/org/apache/druid/query/policy/PolicyConfig.java Outdated
Comment thread processing/src/main/java/org/apache/druid/query/policy/PolicyConfig.java Outdated
Comment thread server/src/main/java/org/apache/druid/server/coordination/ServerManager.java Outdated
Comment thread processing/src/main/java/org/apache/druid/query/policy/PolicyConfig.java Outdated
Comment thread processing/src/main/java/org/apache/druid/query/policy/PolicyConfig.java Outdated
Comment thread processing/src/main/java/org/apache/druid/query/policy/PolicyConfig.java Outdated
Comment thread processing/src/main/java/org/apache/druid/query/policy/PolicyConfig.java Outdated
@kgyrtkirk
Copy link
Copy Markdown
Member

left a few comments; happy to talk about it further!

cecemei and others added 12 commits March 31, 2025 00:49
Implement pushTaskPayload/streamTaskPayload as introduced in apache#14887
for HDFS storage to allow larger mm-less ingestion payloads when using
HDFS as the deep storage location.
* Add deprecated com.google.common.io.Files#write to forbiddenApis

* Replace deprecated Files.write()
Mistakenly categories under deep storage instead of metadata store.
Changes
---------
- Bind `SegmentMetadataCache` only once to
`HeapMemorySegmentMetadataCache` in `SQLMetadataStorageDruidModule`
- Invoke start and stop of the cache from `DruidOverlord` rather than on lifecycle start/stop
- Do not override the binding in `CliOverlord`
…task time (apache#17770)

Changes
---------
- Use `maxIntervalToKill` to determine search interval for killing unused segments.
- If no segment has been killed for the datasource yet, use durationToRetain
…pec was unmodified (apache#17707)

Add an optional query parameter called skipRestartIfUnmodified to the
/druid/indexer/v1/supervisor endpoint. Callers can set skipRestartIfUnmodified=true
to not restart the supervisor if the spec is unchanged.

Example:

curl -X POST --header "Content-Type: application/json" -d @supervisor.json
localhost:8888/druid/indexer/v1/supervisor?skipRestartIfUnmodified=true
Changes
---------
- Emit time lag from Kafka similar to Kinesis as metrics `ingest/kafka/lag/time`,
`ingest/kafka/maxLag/time`, `ingest/kafka/avgLag/time`
- Add new method in `KafkaSupervisor` to fetch timestamps of latest records in stream to compute time lag
- Add new field `emitTimeLagMetrics` in `KafkaSupervisorIOConfig` to toggle emission of new metrics
* suggest filter values when known

* update snapshots

* add more d

* fix load rule clamp

* better segment timeline init
@FrankChen021 FrankChen021 requested a review from Copilot April 11, 2025 01:48
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot reviewed 103 out of 104 changed files in this pull request and generated no comments.

Files not reviewed (1)
  • extensions-core/multi-stage-query/pom.xml: Language not supported
Comments suppressed due to low confidence (3)

extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/MSQTestBase.java:541

  • [nitpick] Ensure that the binding for PolicyEnforcer follows the project’s standard pattern for dependency injection and consider adding a brief comment explaining why the NoopPolicyEnforcer is used in tests.
binder -> binder.bind(PolicyEnforcer.class).toInstance(NoopPolicyEnforcer.instance())

extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/BaseLeafFrameProcessorFactory.java:165

  • Confirm that the updated createSegmentMapFunction method correctly accepts a PolicyEnforcer parameter and that all implementations have been updated accordingly.
final Function<SegmentReference, SegmentReference> segmentMapFn = ExecutionVertex.of(query).createSegmentMapFunction(frameContext.policyEnforcer());

extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/SimpleSegmentMapFnProcessor.java:50

  • [nitpick] Consider updating or adding unit tests to verify that the behavior of the segment mapping function is correctly influenced by the provided PolicyEnforcer.
public SimpleSegmentMapFnProcessor(final Query<?> query, final PolicyEnforcer policyEnforcer)

@cecemei cecemei changed the title Add policy config to allow sanity-check on policy in data nodes Add policy enforcer to sanity check on policy in query execution Apr 11, 2025
@cecemei
Copy link
Copy Markdown
Contributor Author

cecemei commented Apr 11, 2025

TY this is looking good so far. I think the change to make the enforcer only have to deal with Restricted and Table makes sense.

Addressed all your comments, and last round of unit/integration test looks good, could you please take a look again? Thx.

Hey, this is ready for review again, a few things to look for from last round:

  • PolicyEnforcer class now take TableDataSource and ReferenceCountingSegment directly, this should make call stack more obvious. The exception is also thrown from enforcer.
  • The enforcer is injected into QueryLifecycleFactory and TaskToolboxFactory, and used in withPolicies and createSegmentMapFunction. Some file changes are purely because of the constructor now asks for enforcer.

Copy link
Copy Markdown
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new structure of PolicyEnforcer is imo a lot clearer.

* This should be called right before the segment is about to be processed by the query stack, and after
* {@link org.apache.druid.query.planning.ExecutionVertex#createSegmentMapFunction(PolicyEnforcer)}.
*/
default void validateOrElseThrow(PolicyEnforcer policyEnforcer)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a call missing at BroadcastJoinSegmentMapFnProcessor -- the path MSQ goes down when there is a segment-level join happening. Maybe move the PolicyEnforcer arg from ExecutionVertex to DataSource#createSegmentMapFunction itself?

Or perhaps have BroadcastJoinSegmentMapFnProcessor apply the enforcer on its own. If you go that route, please also add a comment to the javadoc for DataSource#createSegmentMapFunction reminding callers that if they are using this method instead of ExecutionVertex, they need to apply a PolicyEnforcer to the resulting segment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at BroadcastJoinSegmentMapFnProcessor, it converts InputNumberDataSource to InlineDataSource, but the enforcer won't have effects on inline data.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I managed to run some tests based on BroadcastJoin in MSQ, disabled the enforcer on DruidQuery side, and it's actually catching security validation failure when I use TableDataSource as one of left child and right child, in the scanning step. And the join step (runWithInputChannel) actually don't enforce anything, because it's using a FrameSegment.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at BroadcastJoinSegmentMapFnProcessor, it converts InputNumberDataSource to InlineDataSource, but the enforcer won't have effects on inline data.

It only inlines if the input number is in the inputNumberToProcessorChannelMap map. The base input won't be in that map, and that base input might be a regular table. I think in the simplest case where a regular table is joined with some subquery result, like select page, q.c from wikipedia, (select count(*) c from "kttm-v2-2019-08-25") q, the segments from wikipedia won't be checked since they'll be using the map function generated here.

I tried setting a breakpoint at validateOrElseThrow(ReferenceCountingSegment segment, Policy policy) and did find that. For MSQ tasks, the breakpoint triggered for kttm-v2-2019-08-25 but not for wikipedia.

For Dart the situation was a little worse; the breakpoint for the validateOrElseThrow(ReferenceCountingSegment segment, Policy policy) method triggered also only for kttm-v2-2019-08-25, but then validate wasn't called because Dart has two layers of ReferenceCountingSegment, so the check baseSegment instanceof QueryableIndexSegment || baseSegment instanceof IncrementalIndexSegment is false.

IMO, the best approach to the first issue (segment map fn created by BroadcastJoinSegmentMapFnProcessor not having enforcement) is to move the PolicyEnforcer into DataSource#createSegmentMapFunction, just like withPolicies. That way we never need to think about whether a particular bare call to DataSource#createSegmentMapFunction is safe or not, because all calls would need the enforcer present.

For the second issue (Dart not validating properly because of double-wrapped segments) -- perhaps it would make sense to move validateOrElseThrow from ReferenceCountingSegment to Segment, and have the impl in ReferenceCountingSegment call getBaseSegment().validateOrElseThrow(policyEnforcer) rather than validateOrElseThrow(this, policyEnforcer)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the example, I was able to reproduce the error now, it looks like right child is inlined but it has already gone through a scan stage, and the current implementation missed the check on the left child.

I'm think, we can ask BroadcastJoinSegmentMapFnProcessor to create an ExecutionVertex based on the query, and pass enforcer in ExecutionVertex's createSegmentMapFunction, so like

DataSource transformed = inlineChannelData(query.getDataSource());
ExecutionVertex ev = ExecutionVertex.of(query.withDataSource(transformed));
return ev.createSegmentMapFunction(policyEnforcer);

This way we can centralize the enforcer in ExecutionVertex without changing DataSource interface.

For the Dart issue, I didn't know it can wrap two layer of ReferenceCountingSegment, I think maybe we can add recursive in the validation? For a restricted segment, would it be

  • Restrict
    • Ref
      • Ref_base
    • policy

or

  • Ref
    • Restrict
      • Ref_base
      • policy

?

Copy link
Copy Markdown
Contributor

@gianm gianm Apr 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes to BroadcastJoinSegmentMapFnProcessor look good to me.

The implementation of PolicyEnforcer#validateOrElseThrow for ReferenceCountingSegment seems a little sketchy still, for a couple reasons:

  • The "else skip validation" seems like a hole for stuff to fall into. Prior to the latest fix, an inner ReferenceCountingSegment did fall into it. I'm wondering if other things could potentially fall in, like extension segment types, new segment types, etc.
  • The branch doing instanceof QueryableIndexSegment || instanceof IncrementalIndexSegment isn't strong enough to detect whether a Segment corresponds to a regular table, because extensions can add other kinds of segments backing tables. This is a specific kind of thing that could fall into the hole mentioned previously.
  • I don't understand why the instanceof RestrictedSegment branch is needed -- is it really possible for RestrictedSegment to wrap another RestrictedSegment? I would think this is impossible given that RestrictedDataSource can only wrap a TableDataSource.

I observe that the DataSource version of validation is cleaner and more robust, and can be because there are two important things DataSource has:

  • DataSource#getChildren exists, so datasource trees can be walked robustly
  • We know that tables correspond to TableDataSource, so it's always possible to identify whether a leaf datasource is a regular table or not.

To have equally robust validation for Segment, I think we would need to add Segment#getChildren to address the first point. For the second point I think we can use a check like this for leaf segments to see if they correspond to regular tables: as(PhysicalSegmentInspector.class) != null.

I tried to think for a while of a robust way to write this code without adding Segment#getChildren and wasn't able to. So I do think we should do that. The default implementation should throw an unsupported operation error.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think SegmentReference could serve the purpose of Segment#getChildren. The default impl is to throw, and every extension (ReferenceCountingSegment, HashJoinSegment, WrappedSegmentReference needs to implement its own validation (basically asking delegate to validate).

To solidify, I'm thinking, can we eliminate the use cases when ReferenceCountingSegment is wrapped with another SegmentReference? I think it would be simpler if we know the structure is RestrictedSegment wraps a ReferenceCountingSegment, which wraps a basic segment (QueryableIndexSegment, LookupSegment, etc).

It seems tests using SpecificSegmentsQuerySegmentWalker follows this structure as well.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To solidify, I'm thinking, can we eliminate the use cases when ReferenceCountingSegment is wrapped with another SegmentReference? I think it would be simpler if we know the structure is RestrictedSegment wraps a ReferenceCountingSegment, which wraps a basic segment (QueryableIndexSegment, LookupSegment, etc).

I did see double-wrapping (ReferenceCountingSegment on top of ReferenceCountingSegment) with Dart with the query select page, q.c from wikipedia, (select count(*) c from "kttm-v2-2019-08-25") q. However, I doubt this is "necessary". The Dart code could likely be adjusted to not do this. So, if it's helpful, go ahead and restrict things so ReferenceCountingSegment cannot wrap another SegmentReference.

To have equally robust validation for Segment, I think we would need to add Segment#getChildren to address the first point. For the second point I think we can use a check like this for leaf segments to see if they correspond to regular tables: as(PhysicalSegmentInspector.class) != null.

About this (using PhysicalSegmentInspector as a proxy for "Segment represents a regular table"), I thought about it some more and IMO this approach is still not ideal. Nothing in the interface says that it can't be implemented for a non-table, and nothing requires that it is implemented for a table.

I think a nicer idea would be to make Segment#getId nullable, and spec it so that it should return nonnull for regular tables backed by actual segments, null for anything else. This to me would make more sense than the current way getId() works, because currently dummy IDs are needed for Segment that aren't backed by actual segments, which seems odd. I skimmed usages of getId(); most seem to either be running in scenarios where it would always be nonnull anyway, or else are using it to interpolate into log messages (could use toString or asString instead).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the interests of having this PR merged before it grows too large, I suggest doing a super-strict check now -- validate all leaf segments regardless of whether they are actual tables or not. This will essentially mean that validation will always fail on lookups, external, and inline segments, because they will not have policies applied. In a follow up we can add something to constraint validation to regular-table-backed Segment only, perhaps using this nullable Segment#getId idea.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely agree on the SegmentId approach, would be a cleaner solution and we can have a better mapping on different datasource (table, inline, lookup, external) to segmentId.

I reverted the double-wrapping check in commit 1e6632f, and then actually moved the no-double-wrapping check f604abf to another pr (#17943). Please take a look again!

Copy link
Copy Markdown
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Items left to follow-ups, as I recall, are:

  • Some way to identify if a Segment corresponds to a regular table (possibly by leveraging getId)
  • Consider restricting double-wrapping of segment references (#17943)

@gianm gianm merged commit 3ef2e5e into apache:master Apr 25, 2025
75 checks passed
@cecemei cecemei deleted the policy branch April 30, 2025 21:57
kfaraz pushed a commit that referenced this pull request May 3, 2025
…able (#17960)

Changes
---------
- Update `Segment.getId()` with `@Nullable` annotation.
- Return null id for `RowBasedSegment` (used by external/lookup/inline segment)
- Return null id for `FrameSegment`, `LookupSegment`
- Update tests

This patch is a follow-up to #17774
@capistrant capistrant added this to the 34.0.0 milestone Jul 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.