Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
111 commits
Select commit Hold shift + click to select a range
ebf8d39
Some debug configs
cecemei Sep 23, 2024
9d01ab6
use postgresql as the default metadata store and set a few debug log
cecemei Oct 17, 2024
1d325dc
Add s3 extension, update local storage directory, use emoji in websit…
cecemei Oct 29, 2024
e2f88ea
Update favicon, easier to find the console tab
cecemei Oct 30, 2024
a2139b7
Add indexer server, add some basic security config, updated historica…
cecemei Dec 11, 2024
654f868
Merge remote-tracking branch 'apache/master' into debug
cecemei Dec 11, 2024
8640a92
Some policy config
cecemei Mar 3, 2025
b086f18
add checks for SegmentMetadataQuery
cecemei Mar 3, 2025
ff21a33
Add thread.sleep for flaky.
cecemei Mar 3, 2025
ad864d6
auth config
cecemei Mar 3, 2025
3b9fabc
format, and remove temp folder rules
cecemei Mar 5, 2025
cf18a05
added NoopPolicyEnforcer and RestrictAllTablesPolicyEnforcer class
cecemei Mar 31, 2025
7e9aef5
Support pushing and streaming task payload for HDFS (#17742)
GWphua Feb 28, 2025
65755d2
Remove usages of deprecated API Files.write() (#17761)
GWphua Mar 2, 2025
4da2b33
Doc: Fix description typo for sqlserver metadata store (#17771)
yurmix Mar 3, 2025
f0468cd
Fix binding of segment metadata cache on CliOverlord (#17772)
kfaraz Mar 3, 2025
dc2780a
Docs: Remove semicolon from example (#17759)
FrankChen021 Mar 3, 2025
b1fcf95
Restrict segment metadata kill query till maxInterval from last kill …
chetanpatidar26 Mar 4, 2025
c3f665d
Update the Supervisor endpoint to not restart the Supervisor if the s…
aho135 Mar 5, 2025
0420632
Reduce noisy coordinator logs (#17779)
kfaraz Mar 5, 2025
5dc7142
Emit time lag from Kafka supervisor (#17735)
ac9817 Mar 5, 2025
33d49a5
fix processed row formatting (#17756)
vogievetsky Mar 5, 2025
ae84dda
Web console: add suggestions for table status filtering. (#17765)
vogievetsky Mar 5, 2025
9416d03
Remove all usages of skife config (#17776)
kfaraz Mar 6, 2025
768cc2b
Add field `taskLimits` to worker select strategies (#16889)
nozjkoitop Mar 6, 2025
06ad7b1
remove NullValueHandlingConfig, NullHandlingModule, NullHandling (#17…
clintropolis Mar 7, 2025
2bb9d3d
Docs: Add SQL query example (#17593)
ektravel Mar 11, 2025
45424ba
More logging cleanup on Overlord (#17780)
kfaraz Mar 12, 2025
c4a4f4d
Remove maven.twttr repo from pom (#17797)
kfaraz Mar 13, 2025
980c492
fix bug (#17791)
kgyrtkirk Mar 13, 2025
e3eeef4
Log query stack traces for DEVELOPER and OPERATOR personas. (#17790)
gianm Mar 13, 2025
e417008
Set useMaxMemoryEstimates=false for MSQ tasks (#17792)
kfaraz Mar 14, 2025
5096d5b
Web console: fix go to task selecting correct task type (#17788)
vogievetsky Mar 14, 2025
73a7bb7
Enable ComponentSuppliers to run queries using Dart (#17787)
kgyrtkirk Mar 17, 2025
f6fbfce
Fix single container config creates failing peon tasks (#17794)
GWphua Mar 17, 2025
f4440b5
Update `k8s-jobs.md` reference (#17805)
emmanuel-ferdman Mar 17, 2025
adb076d
Footer Copyright Year Update (#17751)
omkenge Mar 17, 2025
00b8542
[Revert] Reduce number of metadata transaction retries (#17808)
kfaraz Mar 17, 2025
77aa3fc
Revert "Run JDK 21 workflows with latest JDK. (#17694)" (#17806)
cryptoe Mar 18, 2025
ac692b3
Revert "reject publishing actions with a retriable error code if a ea…
kfaraz Mar 14, 2025
b5a9196
Fix unstable tests after #17787 and dart usage in quidem-ut (#17814)
kgyrtkirk Mar 19, 2025
4bb279f
Use "mix" shuffle spec for target size with nil clusterBy. (#17810)
gianm Mar 19, 2025
e4e123d
Docs: Recommend using runtime property javaOptsArray instead of javaOpts
lfrancke Mar 19, 2025
d87550e
Add minor checks in jetty utils (#17817)
adarshsanjeev Mar 19, 2025
06137a3
CI improvement: Leverage cancelled() instead of always() for CI jobs …
Akshat-Jain Mar 19, 2025
ca03674
Make MSQ tests use the same datasets as other similar tests (#17818)
kgyrtkirk Mar 20, 2025
553c15e
Add unnest tests to quidem (#17825)
weishiuntsai Mar 21, 2025
667ac33
Web console: show loader on aux queries (#17804)
vogievetsky Mar 21, 2025
91b21f1
Use compaction dynamic config to enable compaction supervisors (#17782)
kfaraz Mar 23, 2025
ecd4495
Retry segment publish task actions without holding locks (#17816)
kfaraz Mar 24, 2025
11c6d04
Add the capability to turboload segments onto historicals (#17775)
adarshsanjeev Mar 24, 2025
74573c9
Fix resource leak for GroupBy query merge buffer when query matched r…
maytasm Mar 25, 2025
dc9120d
Add metric and simulation test for turbo loading mode (#17830)
kfaraz Mar 25, 2025
9b2cc56
Update query example (#17811)
ektravel Mar 25, 2025
f22ae42
String util upgrade for jdk9+ (#17795)
GWphua Mar 26, 2025
c4e579b
Documentation Fix (#17826)
GWphua Mar 26, 2025
5e03854
Enable to run quidem tests against multiple configurations; add condi…
kgyrtkirk Mar 26, 2025
7d36e79
Fix failing test in DimensionSchemaUtilsTest (#17832)
jtuglu1 Mar 26, 2025
ec181a7
Improve performance of segment metadata cache on Overlord (#17785)
kfaraz Mar 26, 2025
c17306f
GroupBy: Fix offsets on outer queries. (#17837)
gianm Mar 27, 2025
3e94707
Enable build cache for web-console (#17831)
kgyrtkirk Mar 27, 2025
92d751d
run audit fix (#17836)
vogievetsky Mar 27, 2025
518345d
Do not block task actions on Overlord if segment metadata cache is sy…
kfaraz Mar 27, 2025
21b1ee0
Add json, array, aggregation function tests to quidem (#17842)
weishiuntsai Mar 28, 2025
1b2b242
Optionally include Content-Disposition header in statement results AP…
adarshsanjeev Mar 28, 2025
9910777
Web console: download follow up (#17845)
vogievetsky Mar 29, 2025
63e6c41
Fix flaky unit tests in SegmentBootstrapperTest and KinesisIndexTaskT…
capistrant Mar 29, 2025
c580b2a
Web console: responding to user feedback about the explore view and f…
vogievetsky Mar 29, 2025
2bca650
update TestSegmentCacheManager
cecemei Mar 31, 2025
3682332
Merge commit '3e62978a96' into policy
cecemei Mar 31, 2025
159f50e
revert some style changes
cecemei Mar 31, 2025
0ad463a
validate datasource in CachingClusteredClient as well
cecemei Apr 1, 2025
06bfcce
fix build failure and update style
cecemei Apr 1, 2025
e610403
changes
cecemei Apr 2, 2025
106cfed
Merge branch 'master' into policy
cecemei Apr 2, 2025
fbccd53
add inlineds test
cecemei Apr 2, 2025
ef89d30
add sanity check on segment
cecemei Apr 3, 2025
b2a9ef4
inject policy enforcer
cecemei Apr 3, 2025
92417f3
Merge branch 'master' into policy
cecemei Apr 3, 2025
9063bd0
add PolicyEnforcer binding in MSQTestBase
cecemei Apr 3, 2025
0f14f57
add check in SinkQuerySegmentWalker
cecemei Apr 4, 2025
4344255
more tests in realtime server
cecemei Apr 4, 2025
7658fa2
revert config change in examples
cecemei Apr 4, 2025
71b37f9
Merge branch 'master' into policy
cecemei Apr 4, 2025
e2aa0d2
revert config change in integration test config
cecemei Apr 4, 2025
1e53d82
more tests in msq
cecemei Apr 7, 2025
6415264
another test for unnest in msq
cecemei Apr 7, 2025
4756687
add support for policy from extension
cecemei Apr 7, 2025
fde6f9f
Merge branch 'master' into policy
cecemei Apr 7, 2025
7104472
more test
cecemei Apr 7, 2025
61ad96a
refactor MSQTaskQueryMakerTest to use an instance of MSQTaskQueryMaker
cecemei Apr 7, 2025
04814cf
Add test for JoinDataSource
cecemei Apr 8, 2025
0974d8d
Merge branch 'master' into policy
cecemei Apr 9, 2025
81b630c
Merge branch 'master' into policy
cecemei Apr 10, 2025
d4088b9
add policyEnforcer to withPolicies, and validate segment after segmen…
cecemei Apr 10, 2025
ce841d1
fix binding and test
cecemei Apr 11, 2025
a7958eb
add policy module
cecemei Apr 11, 2025
3586ee1
mock planner toolbox
cecemei Apr 11, 2025
5658bf7
revert some injection
cecemei Apr 11, 2025
cb156c7
add test for stream appenderator
cecemei Apr 11, 2025
7380213
update PolicyEnforcer to take ReferenceCountingSegment as param
cecemei Apr 11, 2025
5904a30
update to QueryLifecycleTest
cecemei Apr 11, 2025
ebd8756
update to SqlTestFramework
cecemei Apr 11, 2025
1874b17
Merge branch 'master' into policy
cecemei Apr 11, 2025
1ec3d56
Merge branch 'master' into policy
cecemei Apr 16, 2025
b929951
pass enforcer to BroadcastJoinSegmentMapFnProcessor and add test. Pol…
cecemei Apr 17, 2025
c975f4c
Merge branch 'master' into policy
cecemei Apr 17, 2025
1e6632f
ReferenceCountingSegment is not allowed to wrap with a SegmentReferen…
cecemei Apr 23, 2025
25ffb7c
Merge remote-tracking branch 'cecemei/debug' into policy
cecemei Apr 23, 2025
f604abf
moving ReferenceCountingSegment to another pr
cecemei Apr 23, 2025
22327d3
Revert "Merge remote-tracking branch 'cecemei/debug' into policy"
cecemei Apr 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.math.expr.ExpressionProcessing;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.policy.NoopPolicyEnforcer;
import org.apache.druid.segment.AutoTypeColumnSchema;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
Expand Down Expand Up @@ -202,6 +203,7 @@ public void setup() throws JsonProcessingException
CalciteTests.createJoinableFactoryWrapper(),
CatalogResolver.NULL_RESOLVER,
new AuthConfig(),
NoopPolicyEnforcer.instance(),
new DruidHookDispatcher()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.druid.query.aggregation.datasketches.theta.sql.ThetaSketchEstimateOperatorConversion;
import org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchModule;
import org.apache.druid.query.lookup.LookupExtractor;
import org.apache.druid.query.policy.NoopPolicyEnforcer;
import org.apache.druid.segment.IncrementalIndexSegment;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.PhysicalSegmentInspector;
Expand Down Expand Up @@ -452,6 +453,7 @@ public static Pair<PlannerFactory, SqlEngine> createSqlSystem(
new JoinableFactoryWrapper(QueryFrameworkUtils.createDefaultJoinableFactory(injector)),
CatalogResolver.NULL_RESOLVER,
new AuthConfig(),
NoopPolicyEnforcer.instance(),
new DruidHookDispatcher()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.policy.NoopPolicyEnforcer;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
import org.apache.druid.segment.generator.GeneratorSchemaInfo;
Expand Down Expand Up @@ -129,6 +130,7 @@ public void setup()
CalciteTests.createJoinableFactoryWrapper(),
CatalogResolver.NULL_RESOLVER,
new AuthConfig(),
NoopPolicyEnforcer.instance(),
new DruidHookDispatcher()
);
groupByQuery = GroupByQuery
Expand Down
4 changes: 4 additions & 0 deletions extensions-core/multi-stage-query/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-testlib</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.querykit.DataSegmentProvider;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.SegmentWrangler;
Expand Down Expand Up @@ -78,6 +79,12 @@ public DartFrameContext(
this.storageParameters = storageParameters;
}

@Override
public PolicyEnforcer policyEnforcer()
{
return workerContext.policyEnforcer();
}

@Override
public SegmentWrangler segmentWrangler()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.server.DruidNode;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
Expand All @@ -61,6 +62,7 @@ public class DartWorkerContext implements WorkerContext
private final WorkerId workerId;
private final DruidNode selfNode;
private final ObjectMapper jsonMapper;
private final PolicyEnforcer policyEnforcer;
private final Injector injector;
private final DartWorkerClient workerClient;
private final DruidProcessingConfig processingConfig;
Expand All @@ -84,6 +86,7 @@ public class DartWorkerContext implements WorkerContext
final String controllerHost,
final DruidNode selfNode,
final ObjectMapper jsonMapper,
final PolicyEnforcer policyEnforcer,
final Injector injector,
final DartWorkerClient workerClient,
final DruidProcessingConfig processingConfig,
Expand All @@ -102,6 +105,7 @@ public class DartWorkerContext implements WorkerContext
this.workerId = WorkerId.fromDruidNode(selfNode, queryId);
this.selfNode = selfNode;
this.jsonMapper = jsonMapper;
this.policyEnforcer = policyEnforcer;
this.injector = injector;
this.workerClient = workerClient;
this.processingConfig = processingConfig;
Expand Down Expand Up @@ -133,6 +137,12 @@ public ObjectMapper jsonMapper()
return jsonMapper;
}

@Override
public PolicyEnforcer policyEnforcer()
{
return policyEnforcer;
}

@Override
public Injector injector()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.segment.SegmentWrangler;
import org.apache.druid.server.DruidNode;
Expand All @@ -52,6 +53,7 @@ public class DartWorkerFactoryImpl implements DartWorkerFactory
private final DruidNode selfNode;
private final ObjectMapper jsonMapper;
private final ObjectMapper smileMapper;
private final PolicyEnforcer policyEnforcer;
private final Injector injector;
private final ServiceClientFactory serviceClientFactory;
private final DruidProcessingConfig processingConfig;
Expand All @@ -67,6 +69,7 @@ public DartWorkerFactoryImpl(
@Self DruidNode selfNode,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper smileMapper,
PolicyEnforcer policyEnforcer,
Injector injector,
@EscalatedGlobal ServiceClientFactory serviceClientFactory,
DruidProcessingConfig processingConfig,
Expand All @@ -81,6 +84,7 @@ public DartWorkerFactoryImpl(
this.selfNode = selfNode;
this.jsonMapper = jsonMapper;
this.smileMapper = smileMapper;
this.policyEnforcer = policyEnforcer;
this.injector = injector;
this.serviceClientFactory = serviceClientFactory;
this.processingConfig = processingConfig;
Expand All @@ -100,6 +104,7 @@ public Worker build(String queryId, String controllerHost, File tempDir, QueryCo
controllerHost,
selfNode,
jsonMapper,
policyEnforcer,
injector,
new DartWorkerClientImpl(queryId, serviceClientFactory, smileMapper, null),
processingConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.msq.kernel.FrameProcessorFactory;
import org.apache.druid.msq.kernel.WorkOrder;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.server.DruidNode;

import java.io.File;
Expand All @@ -51,6 +52,8 @@ public interface WorkerContext

ObjectMapper jsonMapper();

PolicyEnforcer policyEnforcer();

// Using an Injector directly because tasks do not have a way to provide their own Guice modules.
Injector injector();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.querykit.DataSegmentProvider;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.SegmentWrangler;
Expand Down Expand Up @@ -73,6 +74,12 @@ public IndexerFrameContext(
this.dataServerQueryHandlerFactory = dataServerQueryHandlerFactory;
}

@Override
public PolicyEnforcer policyEnforcer()
{
return context.policyEnforcer();
}

@Override
public SegmentWrangler segmentWrangler()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.rpc.ServiceClientFactory;
import org.apache.druid.rpc.ServiceLocations;
import org.apache.druid.rpc.ServiceLocator;
Expand Down Expand Up @@ -191,6 +192,12 @@ public ObjectMapper jsonMapper()
return toolbox.getJsonMapper();
}

@Override
public PolicyEnforcer policyEnforcer()
{
return toolbox.getPolicyEnforcer();
}

@Override
public Injector injector()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.msq.exec.WorkerStorageParameters;
import org.apache.druid.msq.querykit.DataSegmentProvider;
import org.apache.druid.query.groupby.GroupingEngine;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.IndexMergerV9;
import org.apache.druid.segment.SegmentWrangler;
Expand All @@ -44,6 +45,8 @@
*/
public interface FrameContext extends Closeable
{
PolicyEnforcer policyEnforcer();

SegmentWrangler segmentWrangler();

GroupingEngine groupingEngine();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
final ProcessorManager processorManager;

if (segmentMapFnProcessor == null) {
final Function<SegmentReference, SegmentReference> segmentMapFn =
query.getDataSource().createSegmentMapFunction(query);
final Function<SegmentReference, SegmentReference> segmentMapFn = ExecutionVertex.of(query)
.createSegmentMapFunction(frameContext.policyEnforcer());
processorManager = processorManagerFn.apply(ImmutableList.of(segmentMapFn));
} else {
processorManager = new ChainedProcessorManager<>(ProcessorManagers.of(() -> segmentMapFnProcessor), processorManagerFn);
Expand Down Expand Up @@ -342,7 +342,7 @@ private FrameProcessor<Function<SegmentReference, SegmentReference>> makeSegment
if (broadcastInputs.isEmpty()) {
if (ExecutionVertex.of(query).isSegmentMapFunctionExpensive()) {
// Joins may require significant computation to compute the segmentMapFn. Offload it to a processor.
return new SimpleSegmentMapFnProcessor(query);
return new SimpleSegmentMapFnProcessor(query, frameContext.policyEnforcer());
} else {
// Non-joins are expected to have cheap-to-compute segmentMapFn. Do the computation in the factory thread,
// without offloading to a processor.
Expand All @@ -351,6 +351,7 @@ private FrameProcessor<Function<SegmentReference, SegmentReference>> makeSegment
} else {
return BroadcastJoinSegmentMapFnProcessor.create(
query,
frameContext.policyEnforcer(),
broadcastInputs,
frameContext.memoryParameters().getBroadcastBufferMemory()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.query.JoinAlgorithm;
import org.apache.druid.query.Query;
import org.apache.druid.query.planning.ExecutionVertex;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.SegmentReference;
Expand All @@ -64,6 +66,7 @@
public class BroadcastJoinSegmentMapFnProcessor implements FrameProcessor<Function<SegmentReference, SegmentReference>>
{
private final Query<?> query;
private final PolicyEnforcer policyEnforcer;
private final Int2IntMap inputNumberToProcessorChannelMap;
private final List<ReadableFrameChannel> channels;
private final List<FrameReader> channelReaders;
Expand All @@ -87,13 +90,15 @@ public class BroadcastJoinSegmentMapFnProcessor implements FrameProcessor<Functi
*/
public BroadcastJoinSegmentMapFnProcessor(
final Query<?> query,
final PolicyEnforcer policyEnforcer,
final Int2IntMap inputNumberToProcessorChannelMap,
final List<ReadableFrameChannel> channels,
final List<FrameReader> channelReaders,
final long memoryReservedForBroadcastJoin
)
{
this.query = query;
this.policyEnforcer = policyEnforcer;
this.inputNumberToProcessorChannelMap = inputNumberToProcessorChannelMap;
this.channels = channels;
this.channelReaders = channelReaders;
Expand All @@ -117,6 +122,7 @@ public BroadcastJoinSegmentMapFnProcessor(
*/
public static BroadcastJoinSegmentMapFnProcessor create(
final Query<?> query,
final PolicyEnforcer policyEnforcer,
final Int2ObjectMap<ReadableInput> sideChannels,
final long memoryReservedForBroadcastJoin
)
Expand All @@ -134,6 +140,7 @@ public static BroadcastJoinSegmentMapFnProcessor create(

return new BroadcastJoinSegmentMapFnProcessor(
query,
policyEnforcer,
inputNumberToProcessorChannelMap,
inputChannels,
channelReaders,
Expand Down Expand Up @@ -193,7 +200,8 @@ private void addFrame(final int channelNumber, final Frame frame)

private Function<SegmentReference, SegmentReference> createSegmentMapFunction()
{
return inlineChannelData(query.getDataSource()).createSegmentMapFunction(query);
DataSource transformed = inlineChannelData(query.getDataSource());
return ExecutionVertex.of(query.withDataSource(transformed)).createSegmentMapFunction(policyEnforcer);
}

DataSource inlineChannelData(final DataSource originalDataSource)
Expand Down Expand Up @@ -230,7 +238,6 @@ DataSource inlineChannelData(final DataSource originalDataSource)
* broadcast tables.
*
* @param readableInputs all readable input channel numbers, including non-side-channels
*
* @return whether side channels have been fully read
*/
boolean buildBroadcastTablesIncrementally(final IntSet readableInputs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.frame.processor.ReturnOrAwait;
import org.apache.druid.query.Query;
import org.apache.druid.query.planning.ExecutionVertex;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.segment.SegmentReference;

import java.util.Collections;
Expand All @@ -44,10 +45,13 @@
public class SimpleSegmentMapFnProcessor implements FrameProcessor<Function<SegmentReference, SegmentReference>>
{
private final Query<?> query;
private final PolicyEnforcer policyEnforcer;

public SimpleSegmentMapFnProcessor(final Query<?> query)
public SimpleSegmentMapFnProcessor(final Query<?> query,
final PolicyEnforcer policyEnforcer)
{
this.query = query;
this.policyEnforcer = policyEnforcer;
}

@Override
Expand All @@ -66,7 +70,7 @@ public List<WritableFrameChannel> outputChannels()
public ReturnOrAwait<Function<SegmentReference, SegmentReference>> runIncrementally(final IntSet readableInputs)
{
ExecutionVertex ev = ExecutionVertex.of(query);
return ReturnOrAwait.returnObject(ev.createSegmentMapFunction());
return ReturnOrAwait.returnObject(ev.createSegmentMapFunction(policyEnforcer));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.policy.NoopPolicyEnforcer;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.QueryStackTests;
import org.apache.druid.server.ResponseContextConfig;
Expand Down Expand Up @@ -215,6 +216,7 @@ public void register(ControllerHolder holder)
CalciteTests.createJoinableFactoryWrapper(),
CatalogResolver.NULL_RESOLVER,
new AuthConfig(),
NoopPolicyEnforcer.instance(),
new DruidHookDispatcher()
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.policy.NoopPolicyEnforcer;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.sql.calcite.planner.CalciteRulesManager;
import org.apache.druid.sql.calcite.planner.CatalogResolver;
Expand Down Expand Up @@ -77,6 +78,7 @@ NamedViewSchema.NAME, new NamedViewSchema(EasyMock.createMock(ViewSchema.class))
new CalciteRulesManager(ImmutableSet.of()),
CalciteTests.TEST_AUTHORIZER_MAPPER,
AuthConfig.newBuilder().build(),
NoopPolicyEnforcer.instance(),
new DruidHookDispatcher()
);
final NativeSqlEngine engine = CalciteTests.createMockSqlEngine(
Expand Down
Loading
Loading