Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.segment.metadata;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.io.BaseEncoding;
Expand All @@ -30,11 +31,17 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/**
* Utility to generate fingerprint for an object.
* Utility to generate schema fingerprint which is used to ensure schema uniqueness in the metadata database.
* Note, that the generated fingerprint is independent of the column order.
*/
@LazySingleton
public class FingerprintGenerator
Expand All @@ -53,12 +60,20 @@ public FingerprintGenerator(ObjectMapper objectMapper)
* Generates fingerprint or hash string for an object using SHA-256 hash algorithm.
*/
@SuppressWarnings("UnstableApiUsage")
public String generateFingerprint(SchemaPayload schemaPayload, String dataSource, int version)
public String generateFingerprint(final SchemaPayload schemaPayload, final String dataSource, final int version)
{
// Sort the column names in lexicographic order
// The aggregator factories are column order independent since they are stored in a hashmap
// This ensures that all permutations of a given columns would result in the same fingerprint
// thus avoiding schema explosion in the metadata database
// Note that this signature is not persisted anywhere, it is only used for fingerprint computation
final RowSignature sortedSignature = getLexicographicallySortedSignature(schemaPayload.getRowSignature());
final SchemaPayload updatedPayload = new SchemaPayload(sortedSignature, schemaPayload.getAggregatorFactories());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please mention a note about the aggregator factories as well that they are column order independent since they are backed by a map.

try {

final Hasher hasher = Hashing.sha256().newHasher();

hasher.putBytes(objectMapper.writeValueAsBytes(schemaPayload));
hasher.putBytes(objectMapper.writeValueAsBytes(updatedPayload));
// add delimiter, inspired from org.apache.druid.metadata.PendingSegmentRecord.computeSequenceNamePrevIdSha1
hasher.putByte((byte) 0xff);

Expand All @@ -82,4 +97,21 @@ public String generateFingerprint(SchemaPayload schemaPayload, String dataSource
);
}
}

@VisibleForTesting
protected RowSignature getLexicographicallySortedSignature(final RowSignature rowSignature)
{
final List<String> columns = new ArrayList<>(rowSignature.getColumnNames());

Collections.sort(columns);

final RowSignature.Builder sortedSignature = RowSignature.builder();

for (String column : columns) {
ColumnType type = rowSignature.getColumnType(column).orElse(null);
sortedSignature.add(column, type);
}

return sortedSignature.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.metadata;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -48,6 +49,7 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -281,6 +283,112 @@ public void testAnnounceHistoricalSegments() throws IOException
segmentSchemaTestUtils.verifySegmentSchema(segmentIdSchemaMap);
}

@Test
public void testSchemaPermutation() throws JsonProcessingException
{
Set<DataSegment> segments = new HashSet<>();
SegmentSchemaMapping segmentSchemaMapping = new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION);
// Store the first observed column order for each segment for verification purpose
Map<String, Pair<SchemaPayload, Integer>> segmentIdSchemaMap = new HashMap<>();

RowSignature originalOrder =
RowSignature.builder()
.add("d7", ColumnType.LONG_ARRAY)
.add("b1", ColumnType.FLOAT)
.add("a5", ColumnType.DOUBLE)
.build();

// column permutations
List<List<String>> permutations = Arrays.asList(
Arrays.asList("d7", "a5", "b1"),
Arrays.asList("a5", "b1", "d7"),
Arrays.asList("a5", "d7", "b1"),
Arrays.asList("b1", "d7", "a5"),
Arrays.asList("b1", "a5", "d7"),
Arrays.asList("d7", "a5", "b1")
);

boolean first = true;

Random random = ThreadLocalRandom.current();
Random permutationRandom = ThreadLocalRandom.current();

for (int i = 0; i < 105; i++) {
DataSegment segment = new DataSegment(
"fooDataSource",
Intervals.of("2015-01-01T00Z/2015-01-02T00Z"),
"version",
ImmutableMap.of(),
ImmutableList.of("dim1"),
ImmutableList.of("m1"),
new LinearShardSpec(i),
9,
100
);
segments.add(segment);

int randomNum = random.nextInt();

RowSignature rowSignature;

if (first) {
rowSignature = originalOrder;
} else {
RowSignature.Builder builder = RowSignature.builder();
List<String> columns = permutations.get(permutationRandom.nextInt(permutations.size()));

for (String column : columns) {
builder.add(column, originalOrder.getColumnType(column).get());
}

rowSignature = builder.build();
}

SchemaPayload schemaPayload = new SchemaPayload(rowSignature);
segmentIdSchemaMap.put(segment.getId().toString(), Pair.of(new SchemaPayload(originalOrder), randomNum));
segmentSchemaMapping.addSchema(
segment.getId(),
new SchemaPayloadPlus(schemaPayload, (long) randomNum),
fingerprintGenerator.generateFingerprint(
schemaPayload,
segment.getDataSource(),
CentralizedDatasourceSchemaConfig.SCHEMA_VERSION
)
);

if (first) {
coordinator.commitSegments(segments, segmentSchemaMapping);
first = false;
}
}

coordinator.commitSegments(segments, segmentSchemaMapping);
for (DataSegment segment : segments) {
Assert.assertArrayEquals(
mapper.writeValueAsString(segment).getBytes(StandardCharsets.UTF_8),
derbyConnector.lookup(
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable(),
"id",
"payload",
segment.getId().toString()
)
);
}

List<String> segmentIds = segments.stream()
.map(segment -> segment.getId().toString())
.sorted(Comparator.naturalOrder())
.collect(Collectors.toList());

Assert.assertEquals(segmentIds, retrieveUsedSegmentIds(derbyConnectorRule.metadataTablesConfigSupplier().get()));

// Should not update dataSource metadata.
Assert.assertEquals(0, metadataUpdateCounter.get());

// verify that only a single schema is created
segmentSchemaTestUtils.verifySegmentSchema(segmentIdSchemaMap);
}

@Test
public void testAnnounceHistoricalSegments_schemaExists() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.any.StringAnyAggregatorFactory;
import org.apache.druid.query.aggregation.firstlast.first.LongFirstAggregatorFactory;
import org.apache.druid.segment.SchemaPayload;
import org.apache.druid.segment.TestHelper;
Expand All @@ -30,7 +31,9 @@
import org.junit.Assert;
import org.junit.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class FingerprintGeneratorTest
Expand All @@ -45,13 +48,20 @@ public class FingerprintGeneratorTest
@Test
public void testGenerateFingerprint_precalculatedHash()
{
RowSignature rowSignature = RowSignature.builder().add("c1", ColumnType.FLOAT).build();
RowSignature rowSignature =
RowSignature.builder()
.add("c1", ColumnType.LONG)
.add("c0", ColumnType.STRING)
.add("c2", ColumnType.FLOAT)
.add("c3", ColumnType.DOUBLE)
.build();
Map<String, AggregatorFactory> aggregatorFactoryMap = new HashMap<>();
aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "long-col", null));
aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "c1", null));
aggregatorFactoryMap.put("stringAny", new StringAnyAggregatorFactory("stringAny", "c0", 1024, true));

SchemaPayload schemaPayload = new SchemaPayload(rowSignature, aggregatorFactoryMap);

String expected = "DEE5E8F59833102F0FA5B10F8B8884EA15220D1D2A5F6097A93D8309132E1039";
String expected = "82E774457D26D0B8D481B6C39872070B25EA3C72C6EFC107B346FA42641740E1";
Assert.assertEquals(expected, fingerprintGenerator.generateFingerprint(schemaPayload, "ds", 0));
}

Expand All @@ -60,25 +70,38 @@ public void testGenerateFingerprint_columnPermutation()
{
RowSignature rowSignature =
RowSignature.builder()
.add("c1", ColumnType.FLOAT)
.add("c2", ColumnType.LONG)
.add("c1", ColumnType.FLOAT)
.add("c3", ColumnType.DOUBLE)
.add("c0", ColumnType.STRING)
.build();

Map<String, AggregatorFactory> aggregatorFactoryMap = new HashMap<>();
aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "long-col", null));
aggregatorFactoryMap.put("longFirst", new LongFirstAggregatorFactory("longFirst", "c2", null));
aggregatorFactoryMap.put("stringAny", new StringAnyAggregatorFactory("stringAny", "c0", 1024, true));

SchemaPayload schemaPayload = new SchemaPayload(rowSignature, aggregatorFactoryMap);

RowSignature rowSignaturePermutation =
RowSignature.builder()
.add("c2", ColumnType.LONG)
.add("c0", ColumnType.STRING)
.add("c3", ColumnType.DOUBLE)
.add("c1", ColumnType.FLOAT)
.build();

SchemaPayload schemaPayloadNew = new SchemaPayload(rowSignaturePermutation, aggregatorFactoryMap);
Assert.assertNotEquals(
Map<String, AggregatorFactory> aggregatorFactoryMapForPermutation = new HashMap<>();
aggregatorFactoryMapForPermutation.put(
"stringAny",
new StringAnyAggregatorFactory("stringAny", "c0", 1024, true)
);
aggregatorFactoryMapForPermutation.put(
"longFirst",
new LongFirstAggregatorFactory("longFirst", "c2", null)
);

SchemaPayload schemaPayloadNew = new SchemaPayload(rowSignaturePermutation, aggregatorFactoryMapForPermutation);
Assert.assertEquals(
fingerprintGenerator.generateFingerprint(schemaPayload, "ds", 0),
fingerprintGenerator.generateFingerprint(schemaPayloadNew, "ds", 0)
);
Expand Down Expand Up @@ -125,4 +148,29 @@ public void testGenerateFingerprint_differentVersion()
fingerprintGenerator.generateFingerprint(schemaPayload, "ds", 1)
);
}

@Test
public void testRowSignatureIsSorted()
{
RowSignature rowSignature =
RowSignature.builder()
.add("c5", ColumnType.STRING)
.add("c1", ColumnType.FLOAT)
.add("b2", ColumnType.LONG)
.add("d3", ColumnType.DOUBLE)
.add("a1", ColumnType.STRING)
.build();

RowSignature sortedSignature = fingerprintGenerator.getLexicographicallySortedSignature(rowSignature);

Assert.assertNotEquals(rowSignature, sortedSignature);

List<String> columnNames = sortedSignature.getColumnNames();
List<String> sortedOrder = Arrays.asList("a1", "b2", "c1", "c5", "d3");
Assert.assertEquals(sortedOrder, columnNames);

for (String column : sortedOrder) {
Assert.assertEquals(sortedSignature.getColumnType(column), rowSignature.getColumnType(column));
}
}
}