From 1e6b7b6e30d82cc41a038f7ef2dbb0eed9150c34 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Thu, 12 Sep 2024 20:03:12 +0530 Subject: [PATCH 1/5] Normalize schema fingerprint for column permutations --- .../metadata/FingerprintGenerator.java | 35 +++++++++++++++++-- .../metadata/FingerprintGeneratorTest.java | 35 +++++++++++++++---- 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java b/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java index f1cb8ea0a505..4c439d0c0af2 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.hash.Hasher; import com.google.common.hash.Hashing; import com.google.common.io.BaseEncoding; @@ -30,8 +31,13 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.SchemaPayload; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; /** * Utility to generate fingerprint for an object. @@ -53,12 +59,20 @@ public FingerprintGenerator(ObjectMapper objectMapper) * Generates fingerprint or hash string for an object using SHA-256 hash algorithm. */ @SuppressWarnings("UnstableApiUsage") - public String generateFingerprint(SchemaPayload schemaPayload, String dataSource, int version) + public String generateFingerprint(final SchemaPayload schemaPayload, final String dataSource, final int version) { + final RowSignature rowSignature = schemaPayload.getRowSignature(); + // Sort the column names in lexicographic order + // This ensures that all permutations of a given columns would result in the same fingerprint + // thus avoiding schema explosion in the metadata database + // Note that this signature is not persisted anywhere, it is only used for fingerprint computation + final RowSignature sortedSignature = getLexicographicallySortedSignature(rowSignature); + final SchemaPayload updatedPayload = new SchemaPayload(sortedSignature, schemaPayload.getAggregatorFactories()); try { + final Hasher hasher = Hashing.sha256().newHasher(); - hasher.putBytes(objectMapper.writeValueAsBytes(schemaPayload)); + hasher.putBytes(objectMapper.writeValueAsBytes(updatedPayload)); // add delimiter, inspired from org.apache.druid.metadata.PendingSegmentRecord.computeSequenceNamePrevIdSha1 hasher.putByte((byte) 0xff); @@ -82,4 +96,21 @@ public String generateFingerprint(SchemaPayload schemaPayload, String dataSource ); } } + + @VisibleForTesting + protected RowSignature getLexicographicallySortedSignature(final RowSignature rowSignature) + { + final List columns = new ArrayList<>(rowSignature.getColumnNames()); + + Collections.sort(columns); + + final RowSignature.Builder sortedSignature = RowSignature.builder(); + + for (String column : columns) { + ColumnType type = rowSignature.getColumnType(column).orElse(null); + sortedSignature.add(column, type); + } + + return sortedSignature.build(); + } } diff --git a/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java b/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java index 093585508029..49f829efcc79 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.common.config.NullHandling; import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.any.StringAnyAggregatorFactory; import org.apache.druid.query.aggregation.firstlast.first.LongFirstAggregatorFactory; import org.apache.druid.segment.SchemaPayload; import org.apache.druid.segment.TestHelper; @@ -45,13 +46,20 @@ public class FingerprintGeneratorTest @Test public void testGenerateFingerprint_precalculatedHash() { - RowSignature rowSignature = RowSignature.builder().add("c1", ColumnType.FLOAT).build(); + RowSignature rowSignature = + RowSignature.builder() + .add("c1", ColumnType.LONG) + .add("c0", ColumnType.STRING) + .add("c2", ColumnType.FLOAT) + .add("c3", ColumnType.DOUBLE) + .build(); Map aggregatorFactoryMap = new HashMap<>(); - aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "long-col", null)); + aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "c1", null)); + aggregatorFactoryMap.put("stringAny", new StringAnyAggregatorFactory("stringAny", "c0", 1024, true)); SchemaPayload schemaPayload = new SchemaPayload(rowSignature, aggregatorFactoryMap); - String expected = "DEE5E8F59833102F0FA5B10F8B8884EA15220D1D2A5F6097A93D8309132E1039"; + String expected = "82E774457D26D0B8D481B6C39872070B25EA3C72C6EFC107B346FA42641740E1"; Assert.assertEquals(expected, fingerprintGenerator.generateFingerprint(schemaPayload, "ds", 0)); } @@ -60,25 +68,38 @@ public void testGenerateFingerprint_columnPermutation() { RowSignature rowSignature = RowSignature.builder() - .add("c1", ColumnType.FLOAT) .add("c2", ColumnType.LONG) + .add("c1", ColumnType.FLOAT) .add("c3", ColumnType.DOUBLE) + .add("c0", ColumnType.STRING) .build(); Map aggregatorFactoryMap = new HashMap<>(); - aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "long-col", null)); + aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "c2", null)); + aggregatorFactoryMap.put("stringAny", new StringAnyAggregatorFactory("stringAny", "c0", 1024, true)); SchemaPayload schemaPayload = new SchemaPayload(rowSignature, aggregatorFactoryMap); RowSignature rowSignaturePermutation = RowSignature.builder() .add("c2", ColumnType.LONG) + .add("c0", ColumnType.STRING) .add("c3", ColumnType.DOUBLE) .add("c1", ColumnType.FLOAT) .build(); - SchemaPayload schemaPayloadNew = new SchemaPayload(rowSignaturePermutation, aggregatorFactoryMap); - Assert.assertNotEquals( + Map aggregatorFactoryMapForPermutation = new HashMap<>(); + aggregatorFactoryMapForPermutation.put( + "stringAny", + new StringAnyAggregatorFactory("stringAny", "c0", 1024, true) + ); + aggregatorFactoryMapForPermutation.put( + "longFirst", + new LongFirstAggregatorFactory("longFirst", "c2", null) + ); + + SchemaPayload schemaPayloadNew = new SchemaPayload(rowSignaturePermutation, aggregatorFactoryMapForPermutation); + Assert.assertEquals( fingerprintGenerator.generateFingerprint(schemaPayload, "ds", 0), fingerprintGenerator.generateFingerprint(schemaPayloadNew, "ds", 0) ); From c39326ee4e5b9e3193d7174356c6ee4c7100963a Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Fri, 13 Sep 2024 13:34:03 +0530 Subject: [PATCH 2/5] Add test --- .../metadata/FingerprintGenerator.java | 3 +- .../metadata/FingerprintGeneratorTest.java | 28 +++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java b/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java index 4c439d0c0af2..ea100cb0aa63 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java @@ -61,12 +61,11 @@ public FingerprintGenerator(ObjectMapper objectMapper) @SuppressWarnings("UnstableApiUsage") public String generateFingerprint(final SchemaPayload schemaPayload, final String dataSource, final int version) { - final RowSignature rowSignature = schemaPayload.getRowSignature(); // Sort the column names in lexicographic order // This ensures that all permutations of a given columns would result in the same fingerprint // thus avoiding schema explosion in the metadata database // Note that this signature is not persisted anywhere, it is only used for fingerprint computation - final RowSignature sortedSignature = getLexicographicallySortedSignature(rowSignature); + final RowSignature sortedSignature = getLexicographicallySortedSignature(schemaPayload.getRowSignature()); final SchemaPayload updatedPayload = new SchemaPayload(sortedSignature, schemaPayload.getAggregatorFactories()); try { diff --git a/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java b/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java index 49f829efcc79..1293583ed33c 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java @@ -31,7 +31,10 @@ import org.junit.Assert; import org.junit.Test; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; public class FingerprintGeneratorTest @@ -146,4 +149,29 @@ public void testGenerateFingerprint_differentVersion() fingerprintGenerator.generateFingerprint(schemaPayload, "ds", 1) ); } + + @Test + public void testRowSignatureIsSorted() + { + RowSignature rowSignature = + RowSignature.builder() + .add("c5", ColumnType.STRING) + .add("c1", ColumnType.FLOAT) + .add("b2", ColumnType.LONG) + .add("d3", ColumnType.DOUBLE) + .add("a1", ColumnType.STRING) + .build(); + + RowSignature sortedSignature = fingerprintGenerator.getLexicographicallySortedSignature(rowSignature); + + Assert.assertNotEquals(rowSignature, sortedSignature); + + List columnNames = sortedSignature.getColumnNames(); + List sortedOrder = Arrays.asList("a1", "b2", "c1", "c5", "d3"); + Assert.assertEquals(sortedOrder, columnNames); + + for (String column : sortedOrder) { + Assert.assertEquals(sortedSignature.getColumnType(column), rowSignature.getColumnType(column)); + } + } } From 45c9f9a734082b4d2c125976a5edfbe61eb4afa1 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Fri, 13 Sep 2024 17:27:35 +0530 Subject: [PATCH 3/5] Add test to verify that only a single schema is created in the database for different column permutations --- ...orageCoordinatorSchemaPersistenceTest.java | 108 ++++++++++++++++++ 1 file changed, 108 insertions(+) diff --git a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java index 143f8917d782..fc99af763215 100644 --- a/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java +++ b/server/src/test/java/org/apache/druid/metadata/IndexerSqlMetadataStorageCoordinatorSchemaPersistenceTest.java @@ -19,6 +19,7 @@ package org.apache.druid.metadata; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -48,6 +49,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -281,6 +283,112 @@ public void testAnnounceHistoricalSegments() throws IOException segmentSchemaTestUtils.verifySegmentSchema(segmentIdSchemaMap); } + @Test + public void testSchemaPermutation() throws JsonProcessingException + { + Set segments = new HashSet<>(); + SegmentSchemaMapping segmentSchemaMapping = new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION); + // Store the first observed column order for each segment for verification purpose + Map> segmentIdSchemaMap = new HashMap<>(); + + RowSignature originalOrder = + RowSignature.builder() + .add("d7", ColumnType.LONG_ARRAY) + .add("b1", ColumnType.FLOAT) + .add("a5", ColumnType.DOUBLE) + .build(); + + // column permutations + List> permutations = Arrays.asList( + Arrays.asList("d7", "a5", "b1"), + Arrays.asList("a5", "b1", "d7"), + Arrays.asList("a5", "d7", "b1"), + Arrays.asList("b1", "d7", "a5"), + Arrays.asList("b1", "a5", "d7"), + Arrays.asList("d7", "a5", "b1") + ); + + boolean first = true; + + Random random = ThreadLocalRandom.current(); + Random permutationRandom = ThreadLocalRandom.current(); + + for (int i = 0; i < 105; i++) { + DataSegment segment = new DataSegment( + "fooDataSource", + Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), + "version", + ImmutableMap.of(), + ImmutableList.of("dim1"), + ImmutableList.of("m1"), + new LinearShardSpec(i), + 9, + 100 + ); + segments.add(segment); + + int randomNum = random.nextInt(); + + RowSignature rowSignature; + + if (first) { + rowSignature = originalOrder; + } else { + RowSignature.Builder builder = RowSignature.builder(); + List columns = permutations.get(permutationRandom.nextInt(permutations.size())); + + for (String column : columns) { + builder.add(column, originalOrder.getColumnType(column).get()); + } + + rowSignature = builder.build(); + } + + SchemaPayload schemaPayload = new SchemaPayload(rowSignature); + segmentIdSchemaMap.put(segment.getId().toString(), Pair.of(new SchemaPayload(originalOrder), randomNum)); + segmentSchemaMapping.addSchema( + segment.getId(), + new SchemaPayloadPlus(schemaPayload, (long) randomNum), + fingerprintGenerator.generateFingerprint( + schemaPayload, + segment.getDataSource(), + CentralizedDatasourceSchemaConfig.SCHEMA_VERSION + ) + ); + + if (first) { + coordinator.commitSegments(segments, segmentSchemaMapping); + first = false; + } + } + + coordinator.commitSegments(segments, segmentSchemaMapping); + for (DataSegment segment : segments) { + Assert.assertArrayEquals( + mapper.writeValueAsString(segment).getBytes(StandardCharsets.UTF_8), + derbyConnector.lookup( + derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(), + "id", + "payload", + segment.getId().toString() + ) + ); + } + + List segmentIds = segments.stream() + .map(segment -> segment.getId().toString()) + .sorted(Comparator.naturalOrder()) + .collect(Collectors.toList()); + + Assert.assertEquals(segmentIds, retrieveUsedSegmentIds(derbyConnectorRule.metadataTablesConfigSupplier().get())); + + // Should not update dataSource metadata. + Assert.assertEquals(0, metadataUpdateCounter.get()); + + // verify that only a single schema is created + segmentSchemaTestUtils.verifySegmentSchema(segmentIdSchemaMap); + } + @Test public void testAnnounceHistoricalSegments_schemaExists() throws IOException { From 9b2735a3a5ebaa0d07dd43cf24eff434e7d1bb04 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Tue, 17 Sep 2024 10:29:41 +0530 Subject: [PATCH 4/5] Update docs --- .../apache/druid/segment/metadata/FingerprintGenerator.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java b/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java index ea100cb0aa63..7d35b180c62f 100644 --- a/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java +++ b/server/src/main/java/org/apache/druid/segment/metadata/FingerprintGenerator.java @@ -40,7 +40,8 @@ import java.util.List; /** - * Utility to generate fingerprint for an object. + * Utility to generate schema fingerprint which is used to ensure schema uniqueness in the metadata database. + * Note, that the generated fingerprint is independent of the column order. */ @LazySingleton public class FingerprintGenerator @@ -62,6 +63,7 @@ public FingerprintGenerator(ObjectMapper objectMapper) public String generateFingerprint(final SchemaPayload schemaPayload, final String dataSource, final int version) { // Sort the column names in lexicographic order + // The aggregator factories are column order independent since they are stored in a hashmap // This ensures that all permutations of a given columns would result in the same fingerprint // thus avoiding schema explosion in the metadata database // Note that this signature is not persisted anywhere, it is only used for fingerprint computation From 7076e3032db3433aa4d3286f569d38b71f0b60a4 Mon Sep 17 00:00:00 2001 From: rishabh singh Date: Tue, 17 Sep 2024 22:22:42 +0530 Subject: [PATCH 5/5] checkstyle --- .../apache/druid/segment/metadata/FingerprintGeneratorTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java b/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java index 1293583ed33c..c0100ffe1513 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/FingerprintGeneratorTest.java @@ -32,7 +32,6 @@ import org.junit.Test; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map;