From 46b1921ef58e598e8e5c27fc92e8e3e05b4b54cb Mon Sep 17 00:00:00 2001 From: siddhant2001 Date: Fri, 27 Sep 2024 13:23:27 +0530 Subject: [PATCH 1/5] create masking config --- .../service/HandlerScopedMaskingConfig.java | 170 ++++++++++++++++++ .../pinot/PinotBasedRequestHandler.java | 11 ++ .../src/test/resources/application.conf | 21 +++ .../resources/configs/common/application.conf | 21 +++ 4 files changed, 223 insertions(+) create mode 100644 query-service-impl/src/main/java/org/hypertrace/core/query/service/HandlerScopedMaskingConfig.java diff --git a/query-service-impl/src/main/java/org/hypertrace/core/query/service/HandlerScopedMaskingConfig.java b/query-service-impl/src/main/java/org/hypertrace/core/query/service/HandlerScopedMaskingConfig.java new file mode 100644 index 00000000..9ad9fabd --- /dev/null +++ b/query-service-impl/src/main/java/org/hypertrace/core/query/service/HandlerScopedMaskingConfig.java @@ -0,0 +1,170 @@ +package org.hypertrace.core.query.service; + +import com.typesafe.config.Config; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.Collectors; +import lombok.Value; +import lombok.experimental.NonFinal; +import lombok.extern.slf4j.Slf4j; +import org.apache.pinot.client.ResultSet; +import org.hypertrace.core.query.service.api.Row; + +@Slf4j +public class HandlerScopedMaskingConfig { + private static final String TENANT_SCOPED_MASKS_CONFIG_KEY = "tenantScopedMaskingCriteria"; + private final Map> tenantToMaskValuesMap; + private final String tenantColumnName; + private final String timeFilterColumnName; + private int tenantColumnIndex; + private int timeFilterColumnIndex; + private final HashMap columnNameToIndexMap = new HashMap<>(); + + List getMaskValues(String tenantId) { + if (tenantToMaskValuesMap.containsKey(tenantId)) return tenantToMaskValuesMap.get(tenantId); + + return new ArrayList<>(); + } + + public HandlerScopedMaskingConfig( + Config config, Optional timeFilterColumnName, String tenantColumnName) { + if (config.hasPath(TENANT_SCOPED_MASKS_CONFIG_KEY)) { + this.tenantToMaskValuesMap = + config.getConfigList(TENANT_SCOPED_MASKS_CONFIG_KEY).stream() + .map(maskConfig -> new TenantMasks(maskConfig, timeFilterColumnName)) + .collect( + Collectors.toMap( + tenantFilters -> tenantFilters.tenantId, + tenantFilters -> tenantFilters.maskValues)); + } else { + this.tenantToMaskValuesMap = Collections.emptyMap(); + } + + this.tenantColumnName = tenantColumnName; + this.timeFilterColumnName = timeFilterColumnName.orElse(null); + } + + public void parseColumns(ResultSet resultSet) { + timeFilterColumnIndex = -1; + tenantColumnIndex = -1; + columnNameToIndexMap.clear(); + + for (int colIdx = 0; colIdx < resultSet.getColumnCount(); colIdx++) { + if (Objects.equals(this.tenantColumnName, resultSet.getColumnName(colIdx))) { + this.tenantColumnIndex = colIdx; + } else if (Objects.equals(this.timeFilterColumnName, resultSet.getColumnName(colIdx))) { + this.timeFilterColumnIndex = colIdx; + } + + columnNameToIndexMap.put(resultSet.getColumnName(colIdx), colIdx); + } + } + + public Row mask(Row row) { + if (this.tenantColumnIndex == -1 || this.timeFilterColumnIndex == -1) { + return row; + } + List masks = + getMaskValues(row.getColumn(this.tenantColumnIndex).getString()); + + Row.Builder maskedRowBuilder = Row.newBuilder(row); + + for (MaskValuesForTimeRange mask : masks) { + boolean toBeMasked = true; + if (mask.getEndTimeMillis().isPresent()) { + if (mask.getEndTimeMillis().get() < row.getColumn(this.timeFilterColumnIndex).getLong()) { + toBeMasked = false; + } + } + if (mask.getStartTimeMillis().isPresent()) { + if (mask.getStartTimeMillis().get() > row.getColumn(this.timeFilterColumnIndex).getLong()) { + toBeMasked = false; + } + } + + if (toBeMasked) { + for (String columnName : mask.maskValues.getColumnToMaskedValue().keySet()) { + int colIdx = columnNameToIndexMap.get(columnName); + org.hypertrace.core.query.service.api.Value value = + org.hypertrace.core.query.service.api.Value.newBuilder() + .setString(mask.maskValues.getColumnToMaskedValue().get(columnName)) + .build(); + maskedRowBuilder.setColumn(colIdx, value); + } + } + } + + return maskedRowBuilder.build(); + } + + @Value + @NonFinal + private class TenantMasks { + private static final String TENANT_ID_CONFIG_KEY = "tenantId"; + private static final String TIME_RANGE_AND_MASK_VALUES_CONFIG_KEY = "timeRangeAndMaskValues"; + String tenantId; + String startTimeAttributeName; + List maskValues; + + private TenantMasks(Config config, Optional startTimeAttributeName) { + this.tenantId = config.getString(TENANT_ID_CONFIG_KEY); + this.startTimeAttributeName = startTimeAttributeName.orElse(null); + this.maskValues = + config.getConfigList(TIME_RANGE_AND_MASK_VALUES_CONFIG_KEY).stream() + .map(MaskValuesForTimeRange::new) + .collect(Collectors.toList()); + } + } + + @Value + private class MaskValues { + Map columnToMaskedValue; + + MaskValues(Map columnToMaskedValue) { + this.columnToMaskedValue = columnToMaskedValue; + } + } + + @Value + @NonFinal + class MaskValuesForTimeRange { + private static final String START_TIME_CONFIG_PATH = "startTimeMillis"; + private static final String END_TIME_CONFIG_PATH = "endTimeMillis"; + private static final String MASK_VALUE_CONFIG_PATH = "maskValues"; + private static final String ATTRIBUTE_ID_CONFIG_PATH = "attributeId"; + private static final String MASKED_VALUE_CONFIG_PATH = "maskedValue"; + Optional startTimeMillis; + Optional endTimeMillis; + MaskValues maskValues; + + private MaskValuesForTimeRange(Config config) { + if (config.hasPath(START_TIME_CONFIG_PATH) && config.hasPath(END_TIME_CONFIG_PATH)) { + this.startTimeMillis = Optional.of(config.getLong(START_TIME_CONFIG_PATH)); + this.endTimeMillis = Optional.of(config.getLong(END_TIME_CONFIG_PATH)); + } else { + startTimeMillis = Optional.empty(); + endTimeMillis = Optional.empty(); + } + if (config.hasPath(MASK_VALUE_CONFIG_PATH)) { + List maskedValuesList = + new ArrayList<>(config.getConfigList(MASK_VALUE_CONFIG_PATH)); + HashMap maskedValuesMap = new HashMap<>(); + maskedValuesList.forEach( + maskedValue -> { + maskedValuesMap.put( + maskedValue.getString(ATTRIBUTE_ID_CONFIG_PATH), + maskedValue.getString(MASKED_VALUE_CONFIG_PATH)); + }); + + maskValues = new MaskValues(maskedValuesMap); + } else { + maskValues = new MaskValues(new HashMap<>()); + } + } + } +} diff --git a/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java b/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java index a9f55115..5d8f241a 100644 --- a/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java +++ b/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java @@ -28,6 +28,7 @@ import org.apache.pinot.client.ResultSetGroup; import org.hypertrace.core.query.service.ExecutionContext; import org.hypertrace.core.query.service.HandlerScopedFiltersConfig; +import org.hypertrace.core.query.service.HandlerScopedMaskingConfig; import org.hypertrace.core.query.service.QueryCost; import org.hypertrace.core.query.service.RequestHandler; import org.hypertrace.core.query.service.api.Expression; @@ -67,6 +68,7 @@ public class PinotBasedRequestHandler implements RequestHandler { private QueryRequestToPinotSQLConverter request2PinotSqlConverter; private final PinotMapConverter pinotMapConverter; private HandlerScopedFiltersConfig handlerScopedFiltersConfig; + private HandlerScopedMaskingConfig handlerScopedMaskingConfig; // The implementations of ResultSet are package private and hence there's no way to determine the // shape of the results // other than to do string comparison on the simple class names. In order to be able to unit test @@ -143,6 +145,8 @@ private void processConfig(Config config) { this.handlerScopedFiltersConfig = new HandlerScopedFiltersConfig(config, this.startTimeAttributeName); + this.handlerScopedMaskingConfig = + new HandlerScopedMaskingConfig(config, this.startTimeAttributeName, tenantColumnName); LOG.info( "Using {}ms as the threshold for logging slow queries of handler: {}", slowQueryThreshold, @@ -497,6 +501,7 @@ Observable convert(ResultSetGroup resultSetGroup, LinkedHashSet sel List rowBuilderList = new ArrayList<>(); if (resultSetGroup.getResultSetCount() > 0) { ResultSet resultSet = resultSetGroup.getResultSet(0); + handlerScopedMaskingConfig.parseColumns(resultSet); // Pinot has different Response format for selection and aggregation/group by query. if (resultSetTypePredicateProvider.isSelectionResultSetType(resultSet)) { // map merging is only supported in the selection. Filtering and Group by has its own @@ -508,8 +513,10 @@ Observable convert(ResultSetGroup resultSetGroup, LinkedHashSet sel handleAggregationAndGroupBy(resultSetGroup, rowBuilderList); } } + return Observable.fromIterable(rowBuilderList) .map(Builder::build) + .map(row -> handlerScopedMaskingConfig.mask(row)) .doOnNext(row -> LOG.debug("collect a row: {}", row)); } @@ -678,4 +685,8 @@ private boolean isInvalidExpression(Expression expression) { && viewDefinition.getColumnType(expression.getAttributeExpression().getAttributeId()) != ValueType.STRING_MAP; } + + HandlerScopedMaskingConfig getHandlerScopedMaskingConfig() { + return handlerScopedMaskingConfig; + } } diff --git a/query-service-impl/src/test/resources/application.conf b/query-service-impl/src/test/resources/application.conf index 5b954ec1..f05d40c5 100644 --- a/query-service-impl/src/test/resources/application.conf +++ b/query-service-impl/src/test/resources/application.conf @@ -91,6 +91,27 @@ service.config = { ] } ] + tenantScopedMaskingCriteria = [ + { + "tenantId": "testTenant", + "timeRangeAndMaskValues": [ + { + "startTimeMillis": 0, + "endTimeMillis": -1, + "maskValues": [ + { + "attributeId": "column_name1", + "maskedValue": "*" + }, + { + "attributeId": "column_name2", + "maskedValue": "*" + } + ] + }, + ] + } + ] viewDefinition = { viewName = spanEventView mapFields = ["tags"] diff --git a/query-service/src/main/resources/configs/common/application.conf b/query-service/src/main/resources/configs/common/application.conf index 35befa57..5233b3f5 100644 --- a/query-service/src/main/resources/configs/common/application.conf +++ b/query-service/src/main/resources/configs/common/application.conf @@ -94,6 +94,27 @@ service.config = { # ] # } # ] + tenantScopedMaskingCriteria = [ + { + "tenantId": "testTenant", + "timeRangeAndMaskValues": [ + { + "startTimeMillis": 0, + "endTimeMillis": -1, + "maskValues": [ + { + "attributeId": "column_name1", + "maskedValue": "*" + }, + { + "attributeId": "column_name2", + "maskedValue": "*" + } + ] + }, + ] + } + ] viewDefinition = { viewName = spanEventView mapFields = ["tags"] From 5e424ae8cfcc28a23a41417837a116a160c93b0c Mon Sep 17 00:00:00 2001 From: siddhant2001 Date: Mon, 30 Sep 2024 17:26:01 +0530 Subject: [PATCH 2/5] WIP: time range filter --- .../service/HandlerScopedMaskingConfig.java | 30 +++---- .../pinot/PinotBasedRequestHandler.java | 10 ++- .../pinot/PinotBasedRequestHandlerTest.java | 88 ++++++++++++++++++- .../src/test/resources/application.conf | 12 +-- 4 files changed, 109 insertions(+), 31 deletions(-) diff --git a/query-service-impl/src/main/java/org/hypertrace/core/query/service/HandlerScopedMaskingConfig.java b/query-service-impl/src/main/java/org/hypertrace/core/query/service/HandlerScopedMaskingConfig.java index 9ad9fabd..76cd5076 100644 --- a/query-service-impl/src/main/java/org/hypertrace/core/query/service/HandlerScopedMaskingConfig.java +++ b/query-service-impl/src/main/java/org/hypertrace/core/query/service/HandlerScopedMaskingConfig.java @@ -19,9 +19,8 @@ public class HandlerScopedMaskingConfig { private static final String TENANT_SCOPED_MASKS_CONFIG_KEY = "tenantScopedMaskingCriteria"; private final Map> tenantToMaskValuesMap; - private final String tenantColumnName; + private String tenantId; private final String timeFilterColumnName; - private int tenantColumnIndex; private int timeFilterColumnIndex; private final HashMap columnNameToIndexMap = new HashMap<>(); @@ -32,7 +31,7 @@ List getMaskValues(String tenantId) { } public HandlerScopedMaskingConfig( - Config config, Optional timeFilterColumnName, String tenantColumnName) { + Config config, String timeFilterColumnName, String tenantColumnName) { if (config.hasPath(TENANT_SCOPED_MASKS_CONFIG_KEY)) { this.tenantToMaskValuesMap = config.getConfigList(TENANT_SCOPED_MASKS_CONFIG_KEY).stream() @@ -45,19 +44,17 @@ public HandlerScopedMaskingConfig( this.tenantToMaskValuesMap = Collections.emptyMap(); } - this.tenantColumnName = tenantColumnName; - this.timeFilterColumnName = timeFilterColumnName.orElse(null); + this.timeFilterColumnName = timeFilterColumnName; } - public void parseColumns(ResultSet resultSet) { + public void parseColumns(ResultSet resultSet, String tenantId) { timeFilterColumnIndex = -1; - tenantColumnIndex = -1; columnNameToIndexMap.clear(); + this.tenantId = tenantId; for (int colIdx = 0; colIdx < resultSet.getColumnCount(); colIdx++) { - if (Objects.equals(this.tenantColumnName, resultSet.getColumnName(colIdx))) { - this.tenantColumnIndex = colIdx; - } else if (Objects.equals(this.timeFilterColumnName, resultSet.getColumnName(colIdx))) { + String temp = resultSet.getColumnName(colIdx); + if (Objects.equals(this.timeFilterColumnName, resultSet.getColumnName(colIdx))) { this.timeFilterColumnIndex = colIdx; } @@ -66,11 +63,14 @@ public void parseColumns(ResultSet resultSet) { } public Row mask(Row row) { - if (this.tenantColumnIndex == -1 || this.timeFilterColumnIndex == -1) { + if (this.timeFilterColumnIndex == -1) { + return row; + } + + List masks = getMaskValues(this.tenantId); + if (masks.isEmpty()) { return row; } - List masks = - getMaskValues(row.getColumn(this.tenantColumnIndex).getString()); Row.Builder maskedRowBuilder = Row.newBuilder(row); @@ -111,9 +111,9 @@ private class TenantMasks { String startTimeAttributeName; List maskValues; - private TenantMasks(Config config, Optional startTimeAttributeName) { + private TenantMasks(Config config, String startTimeAttributeName) { this.tenantId = config.getString(TENANT_ID_CONFIG_KEY); - this.startTimeAttributeName = startTimeAttributeName.orElse(null); + this.startTimeAttributeName = startTimeAttributeName; this.maskValues = config.getConfigList(TIME_RANGE_AND_MASK_VALUES_CONFIG_KEY).stream() .map(MaskValuesForTimeRange::new) diff --git a/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java b/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java index 5d8f241a..72a1dbab 100644 --- a/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java +++ b/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java @@ -146,7 +146,7 @@ private void processConfig(Config config) { this.handlerScopedFiltersConfig = new HandlerScopedFiltersConfig(config, this.startTimeAttributeName); this.handlerScopedMaskingConfig = - new HandlerScopedMaskingConfig(config, this.startTimeAttributeName, tenantColumnName); + new HandlerScopedMaskingConfig(config, viewDefinition.getPhysicalColumnNames(this.startTimeAttributeName.orElse(null)).get(0), tenantColumnName); LOG.info( "Using {}ms as the threshold for logging slow queries of handler: {}", slowQueryThreshold, @@ -428,7 +428,8 @@ public Observable handleRequest( LOG.debug("Query results: [ {} ]", resultSetGroup.toString()); } // need to merge data especially for Pinot. That's why we need to track the map columns - return this.convert(resultSetGroup, executionContext.getSelectedColumns()) + return this.convert( + resultSetGroup, executionContext.getSelectedColumns(), executionContext.getTenantId()) .doOnComplete( () -> { long requestTimeMs = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS); @@ -497,11 +498,12 @@ private Filter rewriteLeafFilter( return queryFilter; } - Observable convert(ResultSetGroup resultSetGroup, LinkedHashSet selectedAttributes) { + Observable convert( + ResultSetGroup resultSetGroup, LinkedHashSet selectedAttributes, String tenantId) { List rowBuilderList = new ArrayList<>(); if (resultSetGroup.getResultSetCount() > 0) { ResultSet resultSet = resultSetGroup.getResultSet(0); - handlerScopedMaskingConfig.parseColumns(resultSet); + handlerScopedMaskingConfig.parseColumns(resultSet, tenantId); // Pinot has different Response format for selection and aggregation/group by query. if (resultSetTypePredicateProvider.isSelectionResultSetType(resultSet)) { // map merging is only supported in the selection. Filtering and Group by has its own diff --git a/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandlerTest.java b/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandlerTest.java index af1296f7..3b9302bf 100644 --- a/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandlerTest.java +++ b/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandlerTest.java @@ -1353,7 +1353,8 @@ public void testConvertSimpleSelectionsQueryResultSet() throws IOException { ResultSetGroup resultSetGroup = mockResultSetGroup(List.of(resultSet)); verifyResponseRows( - pinotBasedRequestHandler.convert(resultSetGroup, new LinkedHashSet<>()), resultTable); + pinotBasedRequestHandler.convert(resultSetGroup, new LinkedHashSet<>(), "__default"), + resultTable); } @Test @@ -1371,7 +1372,8 @@ public void testConvertAggregationColumnsQueryResultSet() throws IOException { ResultSetGroup resultSetGroup = mockResultSetGroup(List.of(resultSet)); verifyResponseRows( - pinotBasedRequestHandler.convert(resultSetGroup, new LinkedHashSet<>()), resultTable); + pinotBasedRequestHandler.convert(resultSetGroup, new LinkedHashSet<>(), "__default"), + resultTable); } @Test @@ -1432,7 +1434,8 @@ public void testConvertSelectionsWithMapKeysAndValuesQueryResultSet() throws IOE }; verifyResponseRows( - pinotBasedRequestHandler.convert(resultSetGroup, new LinkedHashSet<>()), expectedRows); + pinotBasedRequestHandler.convert(resultSetGroup, new LinkedHashSet<>(), "__default"), + expectedRows); } @Test @@ -1467,7 +1470,8 @@ public void testConvertMultipleResultSetsInFResultSetGroup() throws IOException }; verifyResponseRows( - pinotBasedRequestHandler.convert(resultSetGroup, new LinkedHashSet<>()), expectedRows); + pinotBasedRequestHandler.convert(resultSetGroup, new LinkedHashSet<>(), "__default"), + expectedRows); } @Test @@ -1756,6 +1760,81 @@ public boolean isResultTableResultSetType(ResultSet resultSet) { } } + @Test + public void testMaskColumnValue() throws IOException { + for (Config config : serviceConfig.getConfigList("queryRequestHandlersConfig")) { + if (!isPinotConfig(config)) { + continue; + } + + if (!config.getString("name").equals("span-event-view-handler")) { + continue; + } + + // Mock the PinotClient + PinotClient pinotClient = mock(PinotClient.class); + PinotClientFactory factory = mock(PinotClientFactory.class); + when(factory.getPinotClient(any())).thenReturn(pinotClient); + + String[][] resultTable = + new String[][] { + {"test-span-id-1", "trace-id-1", }, + {"test-span-id-2", "trace-id-1"}, + {"test-span-id-3", "trace-id-1"}, + {"test-span-id-4", "trace-id-2"} + }; + List columnNames = List.of("span_id", "trace_id"); + ResultSet resultSet = mockResultSet(4, 2, columnNames, resultTable); + ResultSetGroup resultSetGroup = mockResultSetGroup(List.of(resultSet)); + + PinotBasedRequestHandler handler = + new PinotBasedRequestHandler( + config.getString("name"), + config.getConfig("requestHandlerInfo"), + new ResultSetTypePredicateProvider() { + @Override + public boolean isSelectionResultSetType(ResultSet resultSet) { + return true; + } + + @Override + public boolean isResultTableResultSetType(ResultSet resultSet) { + return false; + } + }, + factory); + + QueryRequest request = + QueryRequest.newBuilder() + .addSelection(QueryRequestBuilderUtils.createColumnExpression("EVENT.id")) + .addSelection(QueryRequestBuilderUtils.createColumnExpression("EVENT.traceId")) + .build(); + ExecutionContext context = new ExecutionContext("maskTenant", request); + + // The query filter is based on both isEntrySpan and startTime. Since the viewFilter + // checks for both the true and false values of isEntrySpan and query filter only needs + // "true", isEntrySpan predicate is still passed to the store in the query. + String expectedQuery = + "Select span_id, trace_id FROM spanEventView WHERE tenant_id = ?"; + Params params = + Params.newBuilder() + .addStringParam("maskTenant") + .build(); + when(pinotClient.executeQuery(expectedQuery, params)).thenReturn(resultSetGroup); + + String[][] expectedTable = + new String[][] { + {"*", "trace-id-1", }, + {"*", "trace-id-1"}, + {"*", "trace-id-1"}, + {"*", "trace-id-2"} + }; + + verifyResponseRows(handler.handleRequest(request, context), expectedTable); + } + } + + @Test public void testViewColumnFilterRemovalComplexCase() throws IOException { for (Config config : serviceConfig.getConfigList("queryRequestHandlersConfig")) { @@ -2002,6 +2081,7 @@ private ResultSetGroup mockResultSetGroup(List resultSets) { private void verifyResponseRows(Observable rowObservable, String[][] expectedResultTable) throws IOException { + System.out.println(rowObservable); List rows = rowObservable.toList().blockingGet(); assertEquals(expectedResultTable.length, rows.size()); for (int rowIdx = 0; rowIdx < rows.size(); rowIdx++) { diff --git a/query-service-impl/src/test/resources/application.conf b/query-service-impl/src/test/resources/application.conf index f05d40c5..dca49e26 100644 --- a/query-service-impl/src/test/resources/application.conf +++ b/query-service-impl/src/test/resources/application.conf @@ -93,18 +93,14 @@ service.config = { ] tenantScopedMaskingCriteria = [ { - "tenantId": "testTenant", + "tenantId": "maskTenant", "timeRangeAndMaskValues": [ { - "startTimeMillis": 0, - "endTimeMillis": -1, + startTimeMillis = 1 + endTimeMillis = 100 "maskValues": [ { - "attributeId": "column_name1", - "maskedValue": "*" - }, - { - "attributeId": "column_name2", + "attributeId": "span_id", "maskedValue": "*" } ] From 4f178ae5ae96f3e18179ab95fdcc04bf26c478fd Mon Sep 17 00:00:00 2001 From: siddhant2001 Date: Mon, 30 Sep 2024 21:03:07 +0530 Subject: [PATCH 3/5] working --- .../service/HandlerScopedMaskingConfig.java | 116 ++++++++---------- .../pinot/PinotBasedRequestHandler.java | 21 ++-- .../pinot/PinotBasedRequestHandlerTest.java | 89 +++++++------- .../src/test/resources/application.conf | 4 +- .../resources/configs/common/application.conf | 34 ++--- 5 files changed, 128 insertions(+), 136 deletions(-) diff --git a/query-service-impl/src/main/java/org/hypertrace/core/query/service/HandlerScopedMaskingConfig.java b/query-service-impl/src/main/java/org/hypertrace/core/query/service/HandlerScopedMaskingConfig.java index 76cd5076..73561a9e 100644 --- a/query-service-impl/src/main/java/org/hypertrace/core/query/service/HandlerScopedMaskingConfig.java +++ b/query-service-impl/src/main/java/org/hypertrace/core/query/service/HandlerScopedMaskingConfig.java @@ -1,41 +1,30 @@ package org.hypertrace.core.query.service; import com.typesafe.config.Config; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; import lombok.Value; import lombok.experimental.NonFinal; import lombok.extern.slf4j.Slf4j; -import org.apache.pinot.client.ResultSet; -import org.hypertrace.core.query.service.api.Row; @Slf4j public class HandlerScopedMaskingConfig { private static final String TENANT_SCOPED_MASKS_CONFIG_KEY = "tenantScopedMaskingCriteria"; private final Map> tenantToMaskValuesMap; - private String tenantId; - private final String timeFilterColumnName; - private int timeFilterColumnIndex; - private final HashMap columnNameToIndexMap = new HashMap<>(); + private HashMap shouldMaskAttribute = new HashMap<>(); + private HashMap maskedValue = new HashMap<>(); - List getMaskValues(String tenantId) { - if (tenantToMaskValuesMap.containsKey(tenantId)) return tenantToMaskValuesMap.get(tenantId); - - return new ArrayList<>(); - } - - public HandlerScopedMaskingConfig( - Config config, String timeFilterColumnName, String tenantColumnName) { + public HandlerScopedMaskingConfig(Config config) { if (config.hasPath(TENANT_SCOPED_MASKS_CONFIG_KEY)) { this.tenantToMaskValuesMap = config.getConfigList(TENANT_SCOPED_MASKS_CONFIG_KEY).stream() - .map(maskConfig -> new TenantMasks(maskConfig, timeFilterColumnName)) + .map(maskConfig -> new TenantMasks(maskConfig)) .collect( Collectors.toMap( tenantFilters -> tenantFilters.tenantId, @@ -43,63 +32,64 @@ public HandlerScopedMaskingConfig( } else { this.tenantToMaskValuesMap = Collections.emptyMap(); } - - this.timeFilterColumnName = timeFilterColumnName; } - public void parseColumns(ResultSet resultSet, String tenantId) { - timeFilterColumnIndex = -1; - columnNameToIndexMap.clear(); - this.tenantId = tenantId; - - for (int colIdx = 0; colIdx < resultSet.getColumnCount(); colIdx++) { - String temp = resultSet.getColumnName(colIdx); - if (Objects.equals(this.timeFilterColumnName, resultSet.getColumnName(colIdx))) { - this.timeFilterColumnIndex = colIdx; - } + public void parseColumns(ExecutionContext executionContext) { + shouldMaskAttribute.clear(); + String tenantId = executionContext.getTenantId(); - columnNameToIndexMap.put(resultSet.getColumnName(colIdx), colIdx); + if (!tenantToMaskValuesMap.containsKey(tenantId)) { + return; } - } - public Row mask(Row row) { - if (this.timeFilterColumnIndex == -1) { - return row; + Optional queryTimeRange = executionContext.getQueryTimeRange(); + Instant queryStartTime, queryEndTime; + if (queryTimeRange.isPresent()) { + queryStartTime = queryTimeRange.get().getStartTime(); + queryEndTime = queryTimeRange.get().getEndTime(); + } else { + queryEndTime = Instant.MAX; + queryStartTime = Instant.MIN; } - - List masks = getMaskValues(this.tenantId); - if (masks.isEmpty()) { - return row; + for (MaskValuesForTimeRange timeRangeAndMasks : tenantToMaskValuesMap.get(tenantId)) { + boolean timeRangeOverlap = + isTimeRangeOverlap(timeRangeAndMasks, queryStartTime, queryEndTime); + + if (timeRangeOverlap) { + Map attributeToMaskedValue = + timeRangeAndMasks.maskValues.attributeToMaskedValue; + for (String attribute : attributeToMaskedValue.keySet()) { + shouldMaskAttribute.put(attribute, true); + maskedValue.put(attribute, attributeToMaskedValue.get(attribute)); + } + } } + } - Row.Builder maskedRowBuilder = Row.newBuilder(row); + private static boolean isTimeRangeOverlap( + MaskValuesForTimeRange timeRangeAndMasks, Instant queryStartTime, Instant queryEndTime) { + boolean timeRangeOverlap = true; - for (MaskValuesForTimeRange mask : masks) { - boolean toBeMasked = true; - if (mask.getEndTimeMillis().isPresent()) { - if (mask.getEndTimeMillis().get() < row.getColumn(this.timeFilterColumnIndex).getLong()) { - toBeMasked = false; - } - } - if (mask.getStartTimeMillis().isPresent()) { - if (mask.getStartTimeMillis().get() > row.getColumn(this.timeFilterColumnIndex).getLong()) { - toBeMasked = false; - } + if (timeRangeAndMasks.getStartTimeMillis().isPresent()) { + Instant startTimeInstant = Instant.ofEpochMilli(timeRangeAndMasks.getStartTimeMillis().get()); + if (startTimeInstant.isBefore(queryStartTime) || startTimeInstant.isAfter(queryEndTime)) { + timeRangeOverlap = false; } - if (toBeMasked) { - for (String columnName : mask.maskValues.getColumnToMaskedValue().keySet()) { - int colIdx = columnNameToIndexMap.get(columnName); - org.hypertrace.core.query.service.api.Value value = - org.hypertrace.core.query.service.api.Value.newBuilder() - .setString(mask.maskValues.getColumnToMaskedValue().get(columnName)) - .build(); - maskedRowBuilder.setColumn(colIdx, value); - } + Instant endTimeInstant = Instant.ofEpochMilli(timeRangeAndMasks.getStartTimeMillis().get()); + if (endTimeInstant.isBefore(queryStartTime) || endTimeInstant.isAfter(queryEndTime)) { + timeRangeOverlap = false; } } + return timeRangeOverlap; + } + + public boolean shouldMask(String attributeName) { + return this.maskedValue.containsKey(attributeName); + } - return maskedRowBuilder.build(); + public String getMaskedValue(String attributeName) { + return this.maskedValue.get(attributeName); } @Value @@ -108,12 +98,10 @@ private class TenantMasks { private static final String TENANT_ID_CONFIG_KEY = "tenantId"; private static final String TIME_RANGE_AND_MASK_VALUES_CONFIG_KEY = "timeRangeAndMaskValues"; String tenantId; - String startTimeAttributeName; List maskValues; - private TenantMasks(Config config, String startTimeAttributeName) { + private TenantMasks(Config config) { this.tenantId = config.getString(TENANT_ID_CONFIG_KEY); - this.startTimeAttributeName = startTimeAttributeName; this.maskValues = config.getConfigList(TIME_RANGE_AND_MASK_VALUES_CONFIG_KEY).stream() .map(MaskValuesForTimeRange::new) @@ -123,10 +111,10 @@ private TenantMasks(Config config, String startTimeAttributeName) { @Value private class MaskValues { - Map columnToMaskedValue; + Map attributeToMaskedValue; MaskValues(Map columnToMaskedValue) { - this.columnToMaskedValue = columnToMaskedValue; + this.attributeToMaskedValue = columnToMaskedValue; } } diff --git a/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java b/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java index 72a1dbab..293ae327 100644 --- a/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java +++ b/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java @@ -145,8 +145,7 @@ private void processConfig(Config config) { this.handlerScopedFiltersConfig = new HandlerScopedFiltersConfig(config, this.startTimeAttributeName); - this.handlerScopedMaskingConfig = - new HandlerScopedMaskingConfig(config, viewDefinition.getPhysicalColumnNames(this.startTimeAttributeName.orElse(null)).get(0), tenantColumnName); + this.handlerScopedMaskingConfig = new HandlerScopedMaskingConfig(config); LOG.info( "Using {}ms as the threshold for logging slow queries of handler: {}", slowQueryThreshold, @@ -428,8 +427,7 @@ public Observable handleRequest( LOG.debug("Query results: [ {} ]", resultSetGroup.toString()); } // need to merge data especially for Pinot. That's why we need to track the map columns - return this.convert( - resultSetGroup, executionContext.getSelectedColumns(), executionContext.getTenantId()) + return this.convert(resultSetGroup, executionContext) .doOnComplete( () -> { long requestTimeMs = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS); @@ -498,12 +496,13 @@ private Filter rewriteLeafFilter( return queryFilter; } - Observable convert( - ResultSetGroup resultSetGroup, LinkedHashSet selectedAttributes, String tenantId) { + Observable convert(ResultSetGroup resultSetGroup, ExecutionContext executionContext) { + String tenantId = executionContext.getTenantId(); + LinkedHashSet selectedAttributes = executionContext.getSelectedColumns(); List rowBuilderList = new ArrayList<>(); if (resultSetGroup.getResultSetCount() > 0) { ResultSet resultSet = resultSetGroup.getResultSet(0); - handlerScopedMaskingConfig.parseColumns(resultSet, tenantId); + handlerScopedMaskingConfig.parseColumns(executionContext); // Pinot has different Response format for selection and aggregation/group by query. if (resultSetTypePredicateProvider.isSelectionResultSetType(resultSet)) { // map merging is only supported in the selection. Filtering and Group by has its own @@ -518,7 +517,7 @@ Observable convert( return Observable.fromIterable(rowBuilderList) .map(Builder::build) - .map(row -> handlerScopedMaskingConfig.mask(row)) + // .map(row -> handlerScopedMaskingConfig.mask(row)) .doOnNext(row -> LOG.debug("collect a row: {}", row)); } @@ -545,7 +544,11 @@ private void handleSelection( for (String logicalName : selectedAttributes) { // colVal will never be null. But getDataRow can throw a runtime exception if it failed // to retrieve data - String colVal = resultAnalyzer.getDataFromRow(rowId, logicalName); + String colVal = + !handlerScopedMaskingConfig.shouldMask(logicalName) + ? resultAnalyzer.getDataFromRow(rowId, logicalName) + : handlerScopedMaskingConfig.getMaskedValue(logicalName); + builder.addColumn(Value.newBuilder().setString(colVal).build()); } } diff --git a/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandlerTest.java b/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandlerTest.java index 3b9302bf..9b4b5ded 100644 --- a/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandlerTest.java +++ b/query-service-impl/src/test/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandlerTest.java @@ -18,7 +18,6 @@ import com.typesafe.config.ConfigFactory; import io.reactivex.rxjava3.core.Observable; import java.io.IOException; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -1353,7 +1352,8 @@ public void testConvertSimpleSelectionsQueryResultSet() throws IOException { ResultSetGroup resultSetGroup = mockResultSetGroup(List.of(resultSet)); verifyResponseRows( - pinotBasedRequestHandler.convert(resultSetGroup, new LinkedHashSet<>(), "__default"), + pinotBasedRequestHandler.convert( + resultSetGroup, new ExecutionContext("__default", QueryRequest.newBuilder().build())), resultTable); } @@ -1372,7 +1372,8 @@ public void testConvertAggregationColumnsQueryResultSet() throws IOException { ResultSetGroup resultSetGroup = mockResultSetGroup(List.of(resultSet)); verifyResponseRows( - pinotBasedRequestHandler.convert(resultSetGroup, new LinkedHashSet<>(), "__default"), + pinotBasedRequestHandler.convert( + resultSetGroup, new ExecutionContext("__default", QueryRequest.newBuilder().build())), resultTable); } @@ -1434,7 +1435,8 @@ public void testConvertSelectionsWithMapKeysAndValuesQueryResultSet() throws IOE }; verifyResponseRows( - pinotBasedRequestHandler.convert(resultSetGroup, new LinkedHashSet<>(), "__default"), + pinotBasedRequestHandler.convert( + resultSetGroup, new ExecutionContext("__default", QueryRequest.newBuilder().build())), expectedRows); } @@ -1470,7 +1472,8 @@ public void testConvertMultipleResultSetsInFResultSetGroup() throws IOException }; verifyResponseRows( - pinotBasedRequestHandler.convert(resultSetGroup, new LinkedHashSet<>(), "__default"), + pinotBasedRequestHandler.convert( + resultSetGroup, new ExecutionContext("__default", QueryRequest.newBuilder().build())), expectedRows); } @@ -1777,64 +1780,63 @@ public void testMaskColumnValue() throws IOException { when(factory.getPinotClient(any())).thenReturn(pinotClient); String[][] resultTable = - new String[][] { - {"test-span-id-1", "trace-id-1", }, - {"test-span-id-2", "trace-id-1"}, - {"test-span-id-3", "trace-id-1"}, - {"test-span-id-4", "trace-id-2"} - }; + new String[][] { + { + "test-span-id-1", "trace-id-1", + }, + {"test-span-id-2", "trace-id-1"}, + {"test-span-id-3", "trace-id-1"}, + {"test-span-id-4", "trace-id-2"} + }; List columnNames = List.of("span_id", "trace_id"); ResultSet resultSet = mockResultSet(4, 2, columnNames, resultTable); ResultSetGroup resultSetGroup = mockResultSetGroup(List.of(resultSet)); PinotBasedRequestHandler handler = - new PinotBasedRequestHandler( - config.getString("name"), - config.getConfig("requestHandlerInfo"), - new ResultSetTypePredicateProvider() { - @Override - public boolean isSelectionResultSetType(ResultSet resultSet) { - return true; - } - - @Override - public boolean isResultTableResultSetType(ResultSet resultSet) { - return false; - } - }, - factory); + new PinotBasedRequestHandler( + config.getString("name"), + config.getConfig("requestHandlerInfo"), + new ResultSetTypePredicateProvider() { + @Override + public boolean isSelectionResultSetType(ResultSet resultSet) { + return true; + } + + @Override + public boolean isResultTableResultSetType(ResultSet resultSet) { + return false; + } + }, + factory); QueryRequest request = - QueryRequest.newBuilder() - .addSelection(QueryRequestBuilderUtils.createColumnExpression("EVENT.id")) - .addSelection(QueryRequestBuilderUtils.createColumnExpression("EVENT.traceId")) - .build(); + QueryRequest.newBuilder() + .addSelection(QueryRequestBuilderUtils.createColumnExpression("EVENT.id")) + .addSelection(QueryRequestBuilderUtils.createColumnExpression("EVENT.traceId")) + .build(); ExecutionContext context = new ExecutionContext("maskTenant", request); // The query filter is based on both isEntrySpan and startTime. Since the viewFilter // checks for both the true and false values of isEntrySpan and query filter only needs // "true", isEntrySpan predicate is still passed to the store in the query. - String expectedQuery = - "Select span_id, trace_id FROM spanEventView WHERE tenant_id = ?"; - Params params = - Params.newBuilder() - .addStringParam("maskTenant") - .build(); + String expectedQuery = "Select span_id, trace_id FROM spanEventView WHERE tenant_id = ?"; + Params params = Params.newBuilder().addStringParam("maskTenant").build(); when(pinotClient.executeQuery(expectedQuery, params)).thenReturn(resultSetGroup); String[][] expectedTable = - new String[][] { - {"*", "trace-id-1", }, - {"*", "trace-id-1"}, - {"*", "trace-id-1"}, - {"*", "trace-id-2"} - }; + new String[][] { + { + "*", "trace-id-1", + }, + {"*", "trace-id-1"}, + {"*", "trace-id-1"}, + {"*", "trace-id-2"} + }; verifyResponseRows(handler.handleRequest(request, context), expectedTable); } } - @Test public void testViewColumnFilterRemovalComplexCase() throws IOException { for (Config config : serviceConfig.getConfigList("queryRequestHandlersConfig")) { @@ -2081,7 +2083,6 @@ private ResultSetGroup mockResultSetGroup(List resultSets) { private void verifyResponseRows(Observable rowObservable, String[][] expectedResultTable) throws IOException { - System.out.println(rowObservable); List rows = rowObservable.toList().blockingGet(); assertEquals(expectedResultTable.length, rows.size()); for (int rowIdx = 0; rowIdx < rows.size(); rowIdx++) { diff --git a/query-service-impl/src/test/resources/application.conf b/query-service-impl/src/test/resources/application.conf index dca49e26..25bc94d7 100644 --- a/query-service-impl/src/test/resources/application.conf +++ b/query-service-impl/src/test/resources/application.conf @@ -96,11 +96,9 @@ service.config = { "tenantId": "maskTenant", "timeRangeAndMaskValues": [ { - startTimeMillis = 1 - endTimeMillis = 100 "maskValues": [ { - "attributeId": "span_id", + "attributeId": "EVENT.id", "maskedValue": "*" } ] diff --git a/query-service/src/main/resources/configs/common/application.conf b/query-service/src/main/resources/configs/common/application.conf index 5233b3f5..25038550 100644 --- a/query-service/src/main/resources/configs/common/application.conf +++ b/query-service/src/main/resources/configs/common/application.conf @@ -99,22 +99,24 @@ service.config = { "tenantId": "testTenant", "timeRangeAndMaskValues": [ { - "startTimeMillis": 0, - "endTimeMillis": -1, - "maskValues": [ - { - "attributeId": "column_name1", - "maskedValue": "*" - }, - { - "attributeId": "column_name2", - "maskedValue": "*" - } - ] - }, - ] - } - ] +# "startTimeMillis": 0, +# # No startTimeMillis implies no filter on startTime +# "endTimeMillis": 1000, +# # No endTimeMillis implies no filter on endTime +# "maskValues": [ +# { +# "attributeId": "attribute_1", +# "maskedValue": "*" +# }, +# { +# "attributeId": "attribute_2", +# "maskedValue": "*" +# } +# ] +# }, +# ] +# } +# ] viewDefinition = { viewName = spanEventView mapFields = ["tags"] From a2a381c0cb058995a1f9a962099b3baf3613b538 Mon Sep 17 00:00:00 2001 From: siddhant2001 Date: Tue, 1 Oct 2024 14:27:49 +0530 Subject: [PATCH 4/5] fix app.conf --- .../src/main/resources/configs/common/application.conf | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/query-service/src/main/resources/configs/common/application.conf b/query-service/src/main/resources/configs/common/application.conf index 25038550..8222ce8b 100644 --- a/query-service/src/main/resources/configs/common/application.conf +++ b/query-service/src/main/resources/configs/common/application.conf @@ -94,11 +94,11 @@ service.config = { # ] # } # ] - tenantScopedMaskingCriteria = [ - { - "tenantId": "testTenant", - "timeRangeAndMaskValues": [ - { +# tenantScopedMaskingCriteria = [ +# { +# "tenantId": "testTenant", +# "timeRangeAndMaskValues": [ +# { # "startTimeMillis": 0, # # No startTimeMillis implies no filter on startTime # "endTimeMillis": 1000, From 3cd9fe8c72b85e355354c4acb31a162688522983 Mon Sep 17 00:00:00 2001 From: kotharironak Date: Tue, 1 Oct 2024 15:28:17 +0530 Subject: [PATCH 5/5] suggested changes --- .../service/HandlerScopedMaskingConfig.java | 26 ++++++----------- .../pinot/PinotBasedRequestHandler.java | 28 ++++++++----------- 2 files changed, 20 insertions(+), 34 deletions(-) diff --git a/query-service-impl/src/main/java/org/hypertrace/core/query/service/HandlerScopedMaskingConfig.java b/query-service-impl/src/main/java/org/hypertrace/core/query/service/HandlerScopedMaskingConfig.java index 73561a9e..fa5871f5 100644 --- a/query-service-impl/src/main/java/org/hypertrace/core/query/service/HandlerScopedMaskingConfig.java +++ b/query-service-impl/src/main/java/org/hypertrace/core/query/service/HandlerScopedMaskingConfig.java @@ -17,8 +17,6 @@ public class HandlerScopedMaskingConfig { private static final String TENANT_SCOPED_MASKS_CONFIG_KEY = "tenantScopedMaskingCriteria"; private final Map> tenantToMaskValuesMap; - private HashMap shouldMaskAttribute = new HashMap<>(); - private HashMap maskedValue = new HashMap<>(); public HandlerScopedMaskingConfig(Config config) { if (config.hasPath(TENANT_SCOPED_MASKS_CONFIG_KEY)) { @@ -34,12 +32,12 @@ public HandlerScopedMaskingConfig(Config config) { } } - public void parseColumns(ExecutionContext executionContext) { - shouldMaskAttribute.clear(); - String tenantId = executionContext.getTenantId(); + public Map getMaskedColumnsToValueMap(ExecutionContext executionContext) { + Map maskedColumnsToValueMap = new HashMap<>(); + String tenantId = executionContext.getTenantId(); if (!tenantToMaskValuesMap.containsKey(tenantId)) { - return; + return maskedColumnsToValueMap; } Optional queryTimeRange = executionContext.getQueryTimeRange(); @@ -53,17 +51,17 @@ public void parseColumns(ExecutionContext executionContext) { } for (MaskValuesForTimeRange timeRangeAndMasks : tenantToMaskValuesMap.get(tenantId)) { boolean timeRangeOverlap = - isTimeRangeOverlap(timeRangeAndMasks, queryStartTime, queryEndTime); + isTimeRangeOverlap(timeRangeAndMasks, queryStartTime, queryEndTime); if (timeRangeOverlap) { Map attributeToMaskedValue = - timeRangeAndMasks.maskValues.attributeToMaskedValue; + timeRangeAndMasks.maskValues.attributeToMaskedValue; for (String attribute : attributeToMaskedValue.keySet()) { - shouldMaskAttribute.put(attribute, true); - maskedValue.put(attribute, attributeToMaskedValue.get(attribute)); + maskedColumnsToValueMap.put(attribute, attributeToMaskedValue.get(attribute)); } } } + return maskedColumnsToValueMap; } private static boolean isTimeRangeOverlap( @@ -84,14 +82,6 @@ private static boolean isTimeRangeOverlap( return timeRangeOverlap; } - public boolean shouldMask(String attributeName) { - return this.maskedValue.containsKey(attributeName); - } - - public String getMaskedValue(String attributeName) { - return this.maskedValue.get(attributeName); - } - @Value @NonFinal private class TenantMasks { diff --git a/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java b/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java index 293ae327..f4a918ac 100644 --- a/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java +++ b/query-service-impl/src/main/java/org/hypertrace/core/query/service/pinot/PinotBasedRequestHandler.java @@ -426,8 +426,9 @@ public Observable handleRequest( if (LOG.isDebugEnabled()) { LOG.debug("Query results: [ {} ]", resultSetGroup.toString()); } + // need to merge data especially for Pinot. That's why we need to track the map columns - return this.convert(resultSetGroup, executionContext) + return this.convert(resultSetGroup, executionContext.getSelectedColumns(), handlerScopedMaskingConfig.getMaskedColumnsToValueMap(executionContext)) .doOnComplete( () -> { long requestTimeMs = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS); @@ -496,35 +497,32 @@ private Filter rewriteLeafFilter( return queryFilter; } - Observable convert(ResultSetGroup resultSetGroup, ExecutionContext executionContext) { - String tenantId = executionContext.getTenantId(); - LinkedHashSet selectedAttributes = executionContext.getSelectedColumns(); + Observable convert(ResultSetGroup resultSetGroup, LinkedHashSet selectedAttributes, Map maskedColumnsToValueMap) { List rowBuilderList = new ArrayList<>(); if (resultSetGroup.getResultSetCount() > 0) { ResultSet resultSet = resultSetGroup.getResultSet(0); - handlerScopedMaskingConfig.parseColumns(executionContext); // Pinot has different Response format for selection and aggregation/group by query. if (resultSetTypePredicateProvider.isSelectionResultSetType(resultSet)) { // map merging is only supported in the selection. Filtering and Group by has its own // syntax in Pinot - handleSelection(resultSetGroup, rowBuilderList, selectedAttributes); + handleSelection(resultSetGroup, rowBuilderList, selectedAttributes, maskedColumnsToValueMap); } else if (resultSetTypePredicateProvider.isResultTableResultSetType(resultSet)) { - handleTableFormatResultSet(resultSetGroup, rowBuilderList); + handleTableFormatResultSet(resultSetGroup, rowBuilderList, maskedColumnsToValueMap); } else { - handleAggregationAndGroupBy(resultSetGroup, rowBuilderList); + handleAggregationAndGroupBy(resultSetGroup, rowBuilderList, maskedColumnsToValueMap); } } return Observable.fromIterable(rowBuilderList) .map(Builder::build) - // .map(row -> handlerScopedMaskingConfig.mask(row)) .doOnNext(row -> LOG.debug("collect a row: {}", row)); } private void handleSelection( ResultSetGroup resultSetGroup, List rowBuilderList, - LinkedHashSet selectedAttributes) { + LinkedHashSet selectedAttributes, + Map maskedColumnsToValueMap) { int resultSetGroupCount = resultSetGroup.getResultSetCount(); for (int i = 0; i < resultSetGroupCount; i++) { ResultSet resultSet = resultSetGroup.getResultSet(i); @@ -544,10 +542,8 @@ private void handleSelection( for (String logicalName : selectedAttributes) { // colVal will never be null. But getDataRow can throw a runtime exception if it failed // to retrieve data - String colVal = - !handlerScopedMaskingConfig.shouldMask(logicalName) - ? resultAnalyzer.getDataFromRow(rowId, logicalName) - : handlerScopedMaskingConfig.getMaskedValue(logicalName); + String colVal = maskedColumnsToValueMap.containsKey(logicalName) + ? maskedColumnsToValueMap.get(logicalName): resultAnalyzer.getDataFromRow(rowId, logicalName); builder.addColumn(Value.newBuilder().setString(colVal).build()); } @@ -556,7 +552,7 @@ private void handleSelection( } private void handleAggregationAndGroupBy( - ResultSetGroup resultSetGroup, List rowBuilderList) { + ResultSetGroup resultSetGroup, List rowBuilderList, Map maskedColumnsToValueMap) { int resultSetGroupCount = resultSetGroup.getResultSetCount(); Map groupKey2RowIdMap = new HashMap<>(); for (int i = 0; i < resultSetGroupCount; i++) { @@ -600,7 +596,7 @@ private void handleAggregationAndGroupBy( } private void handleTableFormatResultSet( - ResultSetGroup resultSetGroup, List rowBuilderList) { + ResultSetGroup resultSetGroup, List rowBuilderList, Map maskedColumnsToValueMap) { int resultSetGroupCount = resultSetGroup.getResultSetCount(); for (int i = 0; i < resultSetGroupCount; i++) { ResultSet resultSet = resultSetGroup.getResultSet(i);