diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java index 3efbd8d0074e..2929229b4c59 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/DruidSchema.java @@ -19,6 +19,7 @@ package org.apache.druid.sql.calcite.schema; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Predicates; import com.google.common.collect.FluentIterable; @@ -402,7 +403,8 @@ private void addSegment(final DruidServerMetadata server, final DataSegment segm } } - private void removeSegment(final DataSegment segment) + @VisibleForTesting + protected void removeSegment(final DataSegment segment) { synchronized (lock) { log.debug("Segment[%s] is gone.", segment.getId()); @@ -450,7 +452,8 @@ private void removeServerSegment(final DruidServerMetadata server, final DataSeg * Attempt to refresh "segmentSignatures" for a set of segments. Returns the set of segments actually refreshed, * which may be a subset of the asked-for set. */ - private Set refreshSegments(final Set segments) throws IOException + @VisibleForTesting + protected Set refreshSegments(final Set segments) throws IOException { final Set retVal = new HashSet<>(); @@ -506,15 +509,26 @@ private Set refreshSegmentsForDataSource(final String dataSource, f log.debug("Segment[%s] has signature[%s].", segment.getId(), rowSignature); final Map dataSourceSegments = segmentMetadataInfo.get(segment.getDataSource()); - SegmentMetadataHolder holder = dataSourceSegments.get(segment); - SegmentMetadataHolder updatedHolder = SegmentMetadataHolder - .from(holder) - .withRowSignature(rowSignature) - .withNumRows(analysis.getNumRows()) - .build(); - dataSourceSegments.put(segment, updatedHolder); - setSegmentSignature(segment, updatedHolder); - retVal.add(segment); + if (dataSourceSegments == null) { + log.warn("No segment map found with datasource[%s], skipping refresh", segment.getDataSource()); + } else { + SegmentMetadataHolder holder = dataSourceSegments.get(segment); + if (holder == null) { + log.warn( + "No segment[%s] found, skipping refresh", + segment.getId() + ); + } else { + SegmentMetadataHolder updatedHolder = SegmentMetadataHolder + .from(holder) + .withRowSignature(rowSignature) + .withNumRows(analysis.getNumRows()) + .build(); + dataSourceSegments.put(segment, updatedHolder); + setSegmentSignature(segment, updatedHolder); + retVal.add(segment); + } + } } } @@ -628,7 +642,7 @@ private static RowSignature analysisToRowSignature(final SegmentAnalysis analysi return rowSignatureBuilder.build(); } - public Map getSegmentMetadata() + Map getSegmentMetadata() { final Map segmentMetadata = new HashMap<>(); synchronized (lock) { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java index 1b4e00819bb6..30b99e59ab44 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidSchemaTest.java @@ -63,6 +63,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Set; public class DruidSchemaTest extends CalciteTestBase { @@ -237,4 +238,38 @@ public void testGetTableMapFoo2() Assert.assertEquals("m1", fields.get(2).getName()); Assert.assertEquals(SqlTypeName.BIGINT, fields.get(2).getType().getSqlTypeName()); } + + @Test + public void testNullDatasource() throws IOException + { + Map segmentMetadatas = schema.getSegmentMetadata(); + Set segments = segmentMetadatas.keySet(); + Assert.assertEquals(segments.size(), 3); + // segments contains two segments with datasource "foo" and one with datasource "foo2" + // let's remove the only segment with datasource "foo2" + final DataSegment segmentToRemove = segments.stream().filter(segment -> segment.getDataSource().equals("foo2")).findFirst().orElse(null); + Assert.assertFalse(segmentToRemove == null); + schema.removeSegment(segmentToRemove); + schema.refreshSegments(segments); // can cause NPE without dataSourceSegments null check in DruidSchema#refreshSegmentsForDataSource + segmentMetadatas = schema.getSegmentMetadata(); + segments = segmentMetadatas.keySet(); + Assert.assertEquals(segments.size(), 2); + } + + @Test + public void testNullSegmentMetadataHolder() throws IOException + { + Map segmentMetadatas = schema.getSegmentMetadata(); + Set segments = segmentMetadatas.keySet(); + Assert.assertEquals(segments.size(), 3); + //remove one of the segments with datasource "foo" + final DataSegment segmentToRemove = segments.stream().filter(segment -> segment.getDataSource().equals("foo")).findFirst().orElse(null); + Assert.assertFalse(segmentToRemove == null); + schema.removeSegment(segmentToRemove); + schema.refreshSegments(segments); // can cause NPE without holder null check in SegmentMetadataHolder#from + segmentMetadatas = schema.getSegmentMetadata(); + segments = segmentMetadatas.keySet(); + Assert.assertEquals(segments.size(), 2); + } + }