Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.druid.catalog.guice.CatalogClientModule;
import org.apache.druid.catalog.guice.CatalogCoordinatorModule;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.indexer.CompactionEngine;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
import org.apache.druid.indexing.common.task.IndexTask;
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
import org.apache.druid.indexing.overlord.Segments;
Expand All @@ -38,6 +40,7 @@
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.InlineSchemaDataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskQueryTuningConfig;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
Expand All @@ -51,9 +54,10 @@
import org.hamcrest.Matchers;
import org.joda.time.Period;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.List;
import java.util.Map;

/**
Expand All @@ -78,6 +82,14 @@ public EmbeddedDruidCluster createCluster()
return EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper()
.useLatchableEmitter()
.useDefaultTimeoutForLatchableEmitter(600)
.addCommonProperty("druid.auth.authorizers", "[\"allowAll\"]")
.addCommonProperty("druid.auth.authorizer.allowAll.type", "allowAll")
.addCommonProperty("druid.auth.authorizer.allowAll.policy.type", "noRestriction")
.addCommonProperty(
"druid.policy.enforcer.allowedPolicies",
"[\"org.apache.druid.query.policy.NoRestrictionPolicy\"]"
)
.addCommonProperty("druid.policy.enforcer.type", "restrictAllTables")
.addExtensions(
CatalogClientModule.class,
CatalogCoordinatorModule.class,
Expand All @@ -95,18 +107,21 @@ public EmbeddedDruidCluster createCluster()
.addServer(new EmbeddedRouter());
}

@BeforeAll
public void enableCompactionSupervisors()

private void configureCompaction(CompactionEngine compactionEngine)
{
final UpdateResponse updateResponse = cluster.callApi().onLeaderOverlord(
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 100, null, true, null))
o -> o.updateClusterCompactionConfig(new ClusterCompactionConfig(1.0, 100, null, true, compactionEngine))
);
Assertions.assertTrue(updateResponse.isSuccess());
}

@Test
public void test_ingestDayGranularity_andCompactToMonthGranularity_withInlineConfig()
@MethodSource("getEngine")
@ParameterizedTest(name = "compactionEngine={0}")
public void test_ingestDayGranularity_andCompactToMonthGranularity_withInlineConfig(CompactionEngine compactionEngine)
{
configureCompaction(compactionEngine);

// Ingest data at DAY granularity and verify
runIngestionAtGranularity(
"DAY",
Expand All @@ -125,6 +140,29 @@ public void test_ingestDayGranularity_andCompactToMonthGranularity_withInlineCon
.withGranularitySpec(
new UserCompactionTaskGranularityConfig(Granularities.MONTH, null, null)
)
.withTuningConfig(
new UserCompactionTaskQueryTuningConfig(
null,
null,
null,
null,
null,
new DimensionRangePartitionsSpec(null, 5000, List.of("item"), false),
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
)
.build();

runCompactionWithSpec(compactionConfig);
Expand Down Expand Up @@ -192,4 +230,9 @@ private void runIngestionAtGranularity(
.withId(IdUtils.getRandomId());
cluster.callApi().runTask(task, overlord);
}

public static List<CompactionEngine> getEngine()
{
return List.of(CompactionEngine.NATIVE, CompactionEngine.MSQ);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void setUp()

httpRequest = EasyMock.createStrictMock(HttpServletRequest.class);
authorizerMapper = EasyMock.createStrictMock(AuthorizerMapper.class);
EasyMock.expect(authorizerMapper.getAuthorizer("druid")).andReturn(new AllowAllAuthorizer()).anyTimes();
EasyMock.expect(authorizerMapper.getAuthorizer("druid")).andReturn(new AllowAllAuthorizer(null)).anyTimes();

taskMaster = EasyMock.createStrictMock(TaskMaster.class);
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).anyTimes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.msq.indexing.destination.DataSourceMSQDestination;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Druids;
import org.apache.druid.query.Order;
import org.apache.druid.query.OrderBy;
Expand All @@ -63,6 +64,7 @@
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.orderby.OrderByColumnSpec;
import org.apache.druid.query.policy.PolicyEnforcer;
import org.apache.druid.query.spec.MultipleIntervalSegmentSpec;
import org.apache.druid.segment.VirtualColumn;
import org.apache.druid.segment.VirtualColumns;
Expand All @@ -73,6 +75,14 @@
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizationResult;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Escalator;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.druid.server.security.ResourceType;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.planner.ColumnMapping;
import org.apache.druid.sql.calcite.planner.ColumnMappings;
Expand Down Expand Up @@ -262,7 +272,9 @@ public List<MSQControllerTask> createMsqControllerTasks(
} else {
query = buildScanQuery(compactionTask, interval, dataSchema, inputColToVirtualCol);
}

QueryContext compactionTaskContext = new QueryContext(compactionTask.getContext());

DataSourceMSQDestination destination = buildMSQDestination(compactionTask, dataSchema);

boolean isReindex = MSQControllerTask.isReplaceInputDataSourceTask(query, destination);
Expand Down Expand Up @@ -370,6 +382,28 @@ private static RowSignature getRowSignature(DataSchema dataSchema)
return rowSignatureBuilder.build();
}

/**
* Creates a {@link DataSource} and uses 'system' {@link AuthorizationResult} using an {@link Escalator} and
* {@link AuthorizerMapper} and applies any resulting {@link org.apache.druid.query.policy.Policy} to it using
* {@link DataSource#withPolicies(Map, PolicyEnforcer)}
*/
private DataSource getInputDataSource(String name)
{
TableDataSource dataSource = new TableDataSource(name);
final Escalator escalator = injector.getInstance(Escalator.class);
if (escalator != null) {
final AuthorizerMapper authorizerMapper = injector.getInstance(AuthorizerMapper.class);
final PolicyEnforcer policyEnforcer = injector.getInstance(PolicyEnforcer.class);
final AuthorizationResult authResult = AuthorizationUtils.authorizeAllResourceActions(
escalator.createEscalatedAuthenticationResult(),
List.of(new ResourceAction(new Resource(name, ResourceType.DATASOURCE), Action.READ)),
authorizerMapper
);
return dataSource.withPolicies(authResult.getPolicyMap(), policyEnforcer);
}
return dataSource;
}

private static List<DimensionSpec> getAggregateDimensions(
DataSchema dataSchema,
Map<String, VirtualColumn> inputColToVirtualCol
Expand Down Expand Up @@ -457,7 +491,7 @@ private static Map<String, Object> buildQueryContext(
return queryContext;
}

private static Query<?> buildScanQuery(
private Query<?> buildScanQuery(
CompactionTask compactionTask,
Interval interval,
DataSchema dataSchema,
Expand All @@ -467,7 +501,7 @@ private static Query<?> buildScanQuery(
RowSignature rowSignature = getRowSignature(dataSchema);
VirtualColumns virtualColumns = VirtualColumns.create(new ArrayList<>(inputColToVirtualCol.values()));
Druids.ScanQueryBuilder scanQueryBuilder = new Druids.ScanQueryBuilder()
.dataSource(dataSchema.getDataSource())
.dataSource(getInputDataSource(dataSchema.getDataSource()))
.columns(rowSignature.getColumnNames())
.virtualColumns(virtualColumns)
.columnTypes(rowSignature.getColumnTypes())
Expand Down Expand Up @@ -618,7 +652,7 @@ private Query<?> buildGroupByQuery(
.collect(Collectors.toList());

GroupByQuery.Builder builder = new GroupByQuery.Builder()
.setDataSource(new TableDataSource(compactionTask.getDataSource()))
.setDataSource(getInputDataSource(compactionTask.getDataSource()))
.setVirtualColumns(virtualColumns)
.setDimFilter(dimFilter)
.setGranularity(new AllGranularity())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.guice.StartupInjectorBuilder;
import org.apache.druid.guice.security.EscalatorModule;
import org.apache.druid.guice.security.PolicyModule;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.granularity.UniformGranularitySpec;
import org.apache.druid.indexer.partitions.DimensionRangePartitionsSpec;
Expand All @@ -41,6 +44,7 @@
import org.apache.druid.indexing.common.task.CompactionIntervalSpec;
import org.apache.druid.indexing.common.task.CompactionTask;
import org.apache.druid.indexing.common.task.TuningConfigBuilder;
import org.apache.druid.initialization.CoreInjectorBuilder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -74,6 +78,7 @@
import org.apache.druid.segment.transform.CompactionTransformSpec;
import org.apache.druid.segment.transform.TransformSpec;
import org.apache.druid.server.coordinator.CompactionConfigValidationResult;
import org.apache.druid.server.initialization.AuthorizerMapperModule;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.joda.time.Interval;
import org.junit.Assert;
Expand Down Expand Up @@ -143,27 +148,18 @@ public class MSQCompactionRunnerTest
ImmutableSet.of(MV_STRING_DIMENSION.getName())
)
);
private static final Map<Interval, DataSchema> INTERVAL_DATASCHEMAS_WITH_PROJECTION = ImmutableMap.of(
COMPACTION_INTERVAL,
new CombinedDataSchema(
DATA_SOURCE,
new TimestampSpec(TIMESTAMP_COLUMN, null, null),
new DimensionsSpec(DIMENSIONS),
null,
null,
null,
ImmutableList.of(PROJECTION_SPEC),
ImmutableSet.of(MV_STRING_DIMENSION.getName())
)
);
private static final ObjectMapper JSON_MAPPER = new DefaultObjectMapper();
private static final AggregatorFactory AGG1 = new CountAggregatorFactory("agg_0");
private static final AggregatorFactory AGG2 = new LongSumAggregatorFactory("sum_added", "sum_added");
private static final List<AggregatorFactory> AGGREGATORS = ImmutableList.of(AGG1, AGG2);
private static final MSQCompactionRunner MSQ_COMPACTION_RUNNER = new MSQCompactionRunner(
JSON_MAPPER,
TestExprMacroTable.INSTANCE,
null
new CoreInjectorBuilder(new StartupInjectorBuilder().forTests().build()).addModules(
new EscalatorModule(),
new AuthorizerMapperModule(),
new PolicyModule()
).build()
);
private static final List<String> PARTITION_DIMENSIONS = Collections.singletonList(STRING_DIMENSION.getName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,37 @@

package org.apache.druid.guice.security;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.name.Named;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.server.security.AllowAllAuthorizer;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.Authorizer;

public class AuthorizerModule implements Module
import java.util.List;

public class AuthorizerModule implements DruidModule
{
@Override
public void configure(Binder binder)
{
final MapBinder<String, Authorizer> authorizerMapBinder = PolyBind.optionBinder(
binder,
Key.get(Authorizer.class)
}

@Override
public List<? extends Module> getJacksonModules()
{
return List.of(
new SimpleModule().registerSubtypes(AllowAllAuthorizer.class)
);
authorizerMapBinder.addBinding(AuthConfig.ALLOW_ALL_NAME).to(AllowAllAuthorizer.class).in(LazySingleton.class);
}

@Provides
@Named(AuthConfig.ALLOW_ALL_NAME)
public Authorizer getAuthorizer()
{
return new AllowAllAuthorizer();
return new AllowAllAuthorizer(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public AuthorizerMapper get()

// Default is allow all
if (authorizers == null) {
AllowAllAuthorizer allowAllAuthorizer = new AllowAllAuthorizer();
AllowAllAuthorizer allowAllAuthorizer = new AllowAllAuthorizer(null);
authorizerMap.put(AuthConfig.ALLOW_ALL_NAME, allowAllAuthorizer);

return new AuthorizerMapper(null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,36 @@

package org.apache.druid.server.security;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.query.policy.Policy;

import javax.annotation.Nullable;

public class AllowAllAuthorizer implements Authorizer
{
@Nullable
private final Policy policy;

@JsonCreator
public AllowAllAuthorizer(
@JsonProperty("policy") @Nullable Policy policy
)
{
this.policy = policy;
}

@Override
public Access authorize(AuthenticationResult authenticationResult, Resource resource, Action action)
{
if (shouldApplyPolicy(resource, action)) {
return Access.allowWithRestriction(policy);
}
return Access.OK;
}

private boolean shouldApplyPolicy(Resource resource, Action action)
{
return policy != null && AuthorizationUtils.shouldApplyPolicy(resource, action);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class AuthTestUtils
@Override
public Authorizer getAuthorizer(String name)
{
return new AllowAllAuthorizer();
return new AllowAllAuthorizer(null);
}
};
}
Expand Down
Loading
Loading