From e8b0a237084393768734d768ffe1b821cfc9cdd3 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 22 Jan 2019 15:14:42 -0800 Subject: [PATCH 01/26] Add published segment cache in broker --- .../druid/client/DataSegmentInterner.java | 35 ++++ .../druid/client/MetadataSegmentView.java | 170 ++++++++++++++++ .../druid/client/selector/ServerSelector.java | 5 +- .../sql/calcite/schema/SystemSchema.java | 91 ++------- .../sql/calcite/schema/SystemSchemaTest.java | 187 +++++++----------- .../druid/sql/calcite/util/CalciteTests.java | 9 + 6 files changed, 297 insertions(+), 200 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/client/DataSegmentInterner.java create mode 100644 server/src/main/java/org/apache/druid/client/MetadataSegmentView.java diff --git a/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java b/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java new file mode 100644 index 000000000000..f53c802bdb16 --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; +import org.apache.druid.timeline.DataSegment; + +public class DataSegmentInterner +{ + public static final Interner INTERNER = Interners.newWeakInterner(); + + private DataSegmentInterner() + { + + } + +} diff --git a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java new file mode 100644 index 000000000000..e5d2de238165 --- /dev/null +++ b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.client; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Interner; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.inject.Inject; +import org.apache.druid.client.coordinator.Coordinator; +import org.apache.druid.discovery.DruidLeaderClient; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; +import org.apache.druid.timeline.DataSegment; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.DateTime; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +@ManageLifecycle +public class MetadataSegmentView +{ + + private static final Interner DATASEGMENT_INTERNER = DataSegmentInterner.INTERNER; + private static final int DEFAULT_POLL_PERIOD_IN_MS = 60000; + private static final EmittingLogger log = new EmittingLogger(MetadataSegmentView.class); + + private final DruidLeaderClient coordinatorDruidLeaderClient; + private final ObjectMapper jsonMapper; + private final BytesAccumulatingResponseHandler responseHandler; + private final BrokerSegmentWatcherConfig segmentWatcherConfig; + + private final Map publishedSegments = new ConcurrentHashMap<>(); + private ScheduledExecutorService scheduledExec; + + @Inject + public MetadataSegmentView( + final @Coordinator DruidLeaderClient druidLeaderClient, + ObjectMapper jsonMapper, + BytesAccumulatingResponseHandler responseHandler, + final BrokerSegmentWatcherConfig segmentWatcherConfig + ) + { + this.coordinatorDruidLeaderClient = druidLeaderClient; + this.jsonMapper = jsonMapper; + this.responseHandler = responseHandler; + this.segmentWatcherConfig = segmentWatcherConfig; + } + + @LifecycleStart + public void start() + { + scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); + scheduledExec.scheduleWithFixedDelay( + () -> poll(), + 0, + DEFAULT_POLL_PERIOD_IN_MS, + TimeUnit.MILLISECONDS + ); + } + + @LifecycleStop + public void stop() + { + scheduledExec.shutdownNow(); + scheduledExec = null; + } + + private void poll() + { + log.debug("Start polling published segments from coordinator"); + //get authorized published segments from coordinator + final JsonParserIterator metadataSegments = getMetadataSegments( + coordinatorDruidLeaderClient, + jsonMapper, + responseHandler + ); + + final DateTime ts = DateTimes.nowUtc(); + while (metadataSegments.hasNext()) { + final DataSegment interned = DATASEGMENT_INTERNER.intern(metadataSegments.next()); + publishedSegments.put(interned, ts); + } + // filter the segments from cache which may not be present in subsequent polling + publishedSegments.values().removeIf(v -> v != ts); + + if (segmentWatcherConfig.getWatchedDataSources() != null) { + log.debug( + "filtering datasources[%s] in published segments based on broker's watchedDataSources", + segmentWatcherConfig.getWatchedDataSources() + ); + publishedSegments.keySet() + .removeIf(key -> !segmentWatcherConfig.getWatchedDataSources().contains(key.getDataSource())); + } + } + + public Iterator getPublishedSegments() + { + return publishedSegments.keySet().iterator(); + } + + // Note that coordinator must be up to get segments + private static JsonParserIterator getMetadataSegments( + DruidLeaderClient coordinatorClient, + ObjectMapper jsonMapper, + BytesAccumulatingResponseHandler responseHandler + ) + { + Request request; + try { + request = coordinatorClient.makeRequest( + HttpMethod.GET, + StringUtils.format("/druid/coordinator/v1/metadata/segments"), + false + ); + } + catch (IOException e) { + throw new RuntimeException(e); + } + ListenableFuture future = coordinatorClient.goAsync( + request, + responseHandler + ); + + final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() + { + }); + return new JsonParserIterator<>( + typeRef, + future, + request.getUrl().toString(), + null, + request.getUrl().getHost(), + jsonMapper, + responseHandler + ); + } + +} diff --git a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java index de6b58be3854..0e3d0bac3589 100644 --- a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java +++ b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java @@ -19,7 +19,9 @@ package org.apache.druid.client.selector; +import com.google.common.collect.Interner; import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; +import org.apache.druid.client.DataSegmentInterner; import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.coordination.ServerType; import org.apache.druid.timeline.DataSegment; @@ -37,6 +39,7 @@ public class ServerSelector implements DiscoverySelector { + private static final Interner DATASEGMENT_INTERNER = DataSegmentInterner.INTERNER; private final Int2ObjectRBTreeMap> historicalServers; private final Int2ObjectRBTreeMap> realtimeServers; @@ -50,7 +53,7 @@ public ServerSelector( TierSelectorStrategy strategy ) { - this.segment = new AtomicReference<>(segment); + this.segment = new AtomicReference<>(DATASEGMENT_INTERNER.intern(segment)); this.strategy = strategy; this.historicalServers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); this.realtimeServers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 772c62886f4e..d07e3c62a8f2 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -45,6 +45,7 @@ import org.apache.calcite.schema.impl.AbstractTable; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.JsonParserIterator; +import org.apache.druid.client.MetadataSegmentView; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.indexing.IndexingService; @@ -84,6 +85,7 @@ import java.util.Objects; import java.util.Set; + public class SystemSchema extends AbstractSchema { public static final String NAME = "sys"; @@ -149,6 +151,7 @@ public class SystemSchema extends AbstractSchema @Inject public SystemSchema( final DruidSchema druidSchema, + final MetadataSegmentView metadataView, final TimelineServerView serverView, final AuthorizerMapper authorizerMapper, final @Coordinator DruidLeaderClient coordinatorDruidLeaderClient, @@ -158,11 +161,10 @@ public SystemSchema( { Preconditions.checkNotNull(serverView, "serverView"); BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler(); - SegmentsTable segmentsTable = new SegmentsTable( + final SegmentsTable segmentsTable = new SegmentsTable( druidSchema, - coordinatorDruidLeaderClient, + metadataView, jsonMapper, - responseHandler, authorizerMapper ); this.tableMap = ImmutableMap.of( @@ -182,23 +184,20 @@ public Map getTableMap() static class SegmentsTable extends AbstractTable implements ScannableTable { private final DruidSchema druidSchema; - private final DruidLeaderClient druidLeaderClient; private final ObjectMapper jsonMapper; - private final BytesAccumulatingResponseHandler responseHandler; private final AuthorizerMapper authorizerMapper; + private final MetadataSegmentView metadataView; public SegmentsTable( DruidSchema druidSchemna, - DruidLeaderClient druidLeaderClient, + MetadataSegmentView metadataView, ObjectMapper jsonMapper, - BytesAccumulatingResponseHandler responseHandler, AuthorizerMapper authorizerMapper ) { this.druidSchema = druidSchemna; - this.druidLeaderClient = druidLeaderClient; + this.metadataView = metadataView; this.jsonMapper = jsonMapper; - this.responseHandler = responseHandler; this.authorizerMapper = authorizerMapper; } @@ -231,21 +230,14 @@ public Enumerable scan(DataContext root) partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData); } - //get published segments from coordinator - final JsonParserIterator metadataSegments = getMetadataSegments( - druidLeaderClient, - jsonMapper, - responseHandler - ); + //get published segments from metadata segment cache + final Iterator pubSegments = metadataView.getPublishedSegments(); final Set segmentsAlreadySeen = new HashSet<>(); final FluentIterable publishedSegments = FluentIterable - .from(() -> getAuthorizedPublishedSegments( - metadataSegments, - root - )) - .transform((DataSegment val) -> { + .from(() -> pubSegments) + .transform(val -> { try { segmentsAlreadySeen.add(val.getId()); final PartialSegmentData partialSegmentData = partialSegmentDataMap.get(val.getId()); @@ -340,27 +332,6 @@ private Iterator> getAuthorizedAvailab return authorizedSegments.iterator(); } - private CloseableIterator getAuthorizedPublishedSegments( - JsonParserIterator it, - DataContext root - ) - { - final AuthenticationResult authenticationResult = - (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); - - Function> raGenerator = segment -> Collections.singletonList( - AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); - - final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( - authenticationResult, - () -> it, - raGenerator, - authorizerMapper - ); - - return wrap(authorizedSegments.iterator(), it); - } - private static class PartialSegmentData { private final long isAvailable; @@ -404,44 +375,6 @@ public long getNumRows() } } - // Note that coordinator must be up to get segments - private static JsonParserIterator getMetadataSegments( - DruidLeaderClient coordinatorClient, - ObjectMapper jsonMapper, - BytesAccumulatingResponseHandler responseHandler - ) - { - - Request request; - try { - request = coordinatorClient.makeRequest( - HttpMethod.GET, - StringUtils.format("/druid/coordinator/v1/metadata/segments"), - false - ); - } - catch (IOException e) { - throw new RuntimeException(e); - } - ListenableFuture future = coordinatorClient.goAsync( - request, - responseHandler - ); - - final JavaType typeRef = jsonMapper.getTypeFactory().constructType(new TypeReference() - { - }); - return new JsonParserIterator<>( - typeRef, - future, - request.getUrl().toString(), - null, - request.getUrl().getHost(), - jsonMapper, - responseHandler - ); - } - static class ServersTable extends AbstractTable implements ScannableTable { private final TimelineServerView serverView; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 407ff69ed955..b4d0743d6ea1 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -37,6 +37,7 @@ import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; +import org.apache.druid.client.MetadataSegmentView; import org.apache.druid.client.TimelineServerView; import org.apache.druid.data.input.InputRow; import org.apache.druid.discovery.DruidLeaderClient; @@ -98,6 +99,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; public class SystemSchemaTest extends CalciteTestBase { @@ -127,6 +131,7 @@ public class SystemSchemaTest extends CalciteTestBase private AuthorizerMapper authMapper; private static QueryRunnerFactoryConglomerate conglomerate; private static Closer resourceCloser; + private MetadataSegmentView metadataView; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -215,8 +220,10 @@ public Authorizer getAuthorizer(String name) ); druidSchema.start(); druidSchema.awaitInitialization(); + metadataView = EasyMock.createMock(MetadataSegmentView.class); schema = new SystemSchema( druidSchema, + metadataView, serverView, EasyMock.createStrictMock(AuthorizerMapper.class), client, @@ -225,6 +232,44 @@ public Authorizer getAuthorizer(String name) ); } + + private final DataSegment publishedSegment1 = new DataSegment( + "wikipedia1", + Intervals.of("2007/2008"), + "version1", + null, + ImmutableList.of("dim1", "dim2"), + ImmutableList.of("met1", "met2"), + null, + 1, + 53000L, + DataSegment.PruneLoadSpecHolder.DEFAULT + ); + private final DataSegment publishedSegment2 = new DataSegment( + "wikipedia2", + Intervals.of("2008/2009"), + "version2", + null, + ImmutableList.of("dim1", "dim2"), + ImmutableList.of("met1", "met2"), + null, + 1, + 83000L, + DataSegment.PruneLoadSpecHolder.DEFAULT + ); + private final DataSegment publishedSegment3 = new DataSegment( + "wikipedia3", + Intervals.of("2009/2010"), + "version3", + null, + ImmutableList.of("dim1", "dim2"), + ImmutableList.of("met1", "met2"), + null, + 1, + 47000L, + DataSegment.PruneLoadSpecHolder.DEFAULT + ); + private final DataSegment segment1 = new DataSegment( "test1", Intervals.of("2010/2011"), @@ -263,7 +308,7 @@ public Authorizer getAuthorizer(String name) ); private final DataSegment segment4 = new DataSegment( "test4", - Intervals.of("2017/2018"), + Intervals.of("2014/2015"), "version4", null, ImmutableList.of("dim1", "dim2"), @@ -275,7 +320,7 @@ public Authorizer getAuthorizer(String name) ); private final DataSegment segment5 = new DataSegment( "test5", - Intervals.of("2017/2018"), + Intervals.of("2015/2016"), "version5", null, ImmutableList.of("dim1", "dim2"), @@ -340,120 +385,22 @@ public void testGetTableMap() } @Test - public void testSegmentsTable() throws Exception + public void testSegmentsTable() { final SystemSchema.SegmentsTable segmentsTable = EasyMock .createMockBuilder(SystemSchema.SegmentsTable.class) - .withConstructor(druidSchema, client, mapper, responseHandler, authMapper) + .withConstructor(druidSchema, metadataView, mapper, authMapper) .createMock(); EasyMock.replay(segmentsTable); - - EasyMock - .expect(client.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/metadata/segments", false)) - .andReturn(request) - .anyTimes(); - SettableFuture future = SettableFuture.create(); - EasyMock.expect(client.goAsync(request, responseHandler)).andReturn(future).once(); - final int ok = HttpServletResponse.SC_OK; - EasyMock.expect(responseHandler.getStatus()).andReturn(ok).anyTimes(); - - EasyMock - .expect(request.getUrl()) - .andReturn(new URL("http://test-host:1234/druid/coordinator/v1/metadata/segments")) - .anyTimes(); - - AppendableByteArrayInputStream in = new AppendableByteArrayInputStream(); - //segments in metadata store : wikipedia1, wikipedia2, wikipedia3, test1, test2 - final String json = "[{\n" - + "\t\"dataSource\": \"wikipedia1\",\n" - + "\t\"interval\": \"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n" - + "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n" - + "\t\"loadSpec\": {\n" - + "\t\t\"type\": \"local\",\n" - + "\t\t\"path\": \"/var/druid/segments/wikipedia-kafka/2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z/2018-08-07T23:00:00.059Z/51/1578eb79-0e44-4b41-a87b-65e40c52be53/index.zip\"\n" - + "\t},\n" - + "\t\"dimensions\": \"isRobot,channel,flags,isUnpatrolled,page,diffUrl,comment,isNew,isMinor,user,namespace,commentLength,deltaBucket,cityName,countryIsoCode,countryName,isAnonymous,regionIsoCode,regionName,added,deleted,delta\",\n" - + "\t\"metrics\": \"count,user_unique\",\n" - + "\t\"shardSpec\": {\n" - + "\t\t\"type\": \"none\",\n" - + "\t\t\"partitionNum\": 51,\n" - + "\t\t\"partitions\": 0\n" - + "\t},\n" - + "\t\"binaryVersion\": 9,\n" - + "\t\"size\": 47406,\n" - + "\t\"identifier\": \"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_51\"\n" - + "}, {\n" - + "\t\"dataSource\": \"wikipedia2\",\n" - + "\t\"interval\": \"2018-08-07T18:00:00.000Z/2018-08-07T19:00:00.000Z\",\n" - + "\t\"version\": \"2018-08-07T18:00:00.117Z\",\n" - + "\t\"loadSpec\": {\n" - + "\t\t\"type\": \"local\",\n" - + "\t\t\"path\": \"/var/druid/segments/wikipedia-kafka/2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z/2018-08-07T18:00:00.117Z/9/a2646827-b782-424c-9eed-e48aa448d2c5/index.zip\"\n" - + "\t},\n" - + "\t\"dimensions\": \"isRobot,channel,flags,isUnpatrolled,page,diffUrl,comment,isNew,isMinor,user,namespace,commentLength,deltaBucket,cityName,countryIsoCode,countryName,isAnonymous,metroCode,regionIsoCode,regionName,added,deleted,delta\",\n" - + "\t\"metrics\": \"count,user_unique\",\n" - + "\t\"shardSpec\": {\n" - + "\t\t\"type\": \"none\",\n" - + "\t\t\"partitionNum\": 9,\n" - + "\t\t\"partitions\": 0\n" - + "\t},\n" - + "\t\"binaryVersion\": 9,\n" - + "\t\"size\": 83846,\n" - + "\t\"identifier\": \"wikipedia-kafka_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z_9\"\n" - + "}, {\n" - + "\t\"dataSource\": \"wikipedia3\",\n" - + "\t\"interval\": \"2018-08-07T23:00:00.000Z/2018-08-08T00:00:00.000Z\",\n" - + "\t\"version\": \"2018-08-07T23:00:00.059Z\",\n" - + "\t\"loadSpec\": {\n" - + "\t\t\"type\": \"local\",\n" - + "\t\t\"path\": \"/var/druid/segments/wikipedia-kafka/2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z/2018-08-07T23:00:00.059Z/50/87c5457e-c39b-4c03-9df8-e2b20b210dfc/index.zip\"\n" - + "\t},\n" - + "\t\"dimensions\": \"isRobot,channel,flags,isUnpatrolled,page,diffUrl,comment,isNew,isMinor,user,namespace,commentLength,deltaBucket,cityName,countryIsoCode,countryName,isAnonymous,metroCode,regionIsoCode,regionName,added,deleted,delta\",\n" - + "\t\"metrics\": \"count,user_unique\",\n" - + "\t\"shardSpec\": {\n" - + "\t\t\"type\": \"none\",\n" - + "\t\t\"partitionNum\": 50,\n" - + "\t\t\"partitions\": 0\n" - + "\t},\n" - + "\t\"binaryVersion\": 9,\n" - + "\t\"size\": 53527,\n" - + "\t\"identifier\": \"wikipedia-kafka_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z_50\"\n" - + "}, {\n" - + "\t\"dataSource\": \"test1\",\n" - + "\t\"interval\": \"2010-01-01T00:00:00.000Z/2011-01-01T00:00:00.000Z\",\n" - + "\t\"version\": \"version1\",\n" - + "\t\"loadSpec\": null,\n" - + "\t\"dimensions\": \"dim1,dim2\",\n" - + "\t\"metrics\": \"met1,met2\",\n" - + "\t\"shardSpec\": {\n" - + "\t\t\"type\": \"none\",\n" - + "\t\t\"domainDimensions\": []\n" - + "\t},\n" - + "\t\"binaryVersion\": 1,\n" - + "\t\"size\": 100,\n" - + "\t\"identifier\": \"test1_2010-01-01T00:00:00.000Z_2011-01-01T00:00:00.000Z_version1\"\n" - + "}, {\n" - + "\t\"dataSource\": \"test2\",\n" - + "\t\"interval\": \"2011-01-01T00:00:00.000Z/2012-01-01T00:00:00.000Z\",\n" - + "\t\"version\": \"version2\",\n" - + "\t\"loadSpec\": null,\n" - + "\t\"dimensions\": \"dim1,dim2\",\n" - + "\t\"metrics\": \"met1,met2\",\n" - + "\t\"shardSpec\": {\n" - + "\t\t\"type\": \"none\",\n" - + "\t\t\"domainDimensions\": []\n" - + "\t},\n" - + "\t\"binaryVersion\": 1,\n" - + "\t\"size\": 100,\n" - + "\t\"identifier\": \"test2_2011-01-01T00:00:00.000Z_2012-01-01T00:00:00.000Z_version2\"\n" - + "}]"; - byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8); - in.add(bytesToWrite); - in.done(); - future.set(in); - - EasyMock.replay(client, request, responseHolder, responseHandler); + final Set publishedSegments = Stream.of(publishedSegment1, + publishedSegment2, + publishedSegment3, + segment1, + segment2).collect(Collectors.toSet()); + EasyMock.expect(metadataView.getPublishedSegments()).andReturn(publishedSegments.iterator()).once(); + + EasyMock.replay(client, request, responseHolder, responseHandler, metadataView); DataContext dataContext = new DataContext() { @Override @@ -531,7 +478,7 @@ public Object get(String name) verifyRow( rows.get(3), - "test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", + "test4_2014-01-01T00:00:00.000Z_2015-01-01T00:00:00.000Z_version4", 100L, 0L, //partition_num 1L, //num_replicas @@ -543,7 +490,7 @@ public Object get(String name) verifyRow( rows.get(4), - "test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", + "test5_2015-01-01T00:00:00.000Z_2016-01-01T00:00:00.000Z_version5", 100L, 0L, //partition_num 1L, //num_replicas @@ -556,8 +503,8 @@ public Object get(String name) // wikipedia segments are published and unavailable, num_replicas is 0 verifyRow( rows.get(5), - "wikipedia1_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z", - 47406L, + "wikipedia1_2007-01-01T00:00:00.000Z_2008-01-01T00:00:00.000Z_version1", + 53000L, 0L, //partition_num 0L, //num_replicas 0L, //numRows @@ -568,8 +515,8 @@ public Object get(String name) verifyRow( rows.get(6), - "wikipedia2_2018-08-07T18:00:00.000Z_2018-08-07T19:00:00.000Z_2018-08-07T18:00:00.117Z", - 83846L, + "wikipedia2_2008-01-01T00:00:00.000Z_2009-01-01T00:00:00.000Z_version2", + 83000L, 0L, //partition_num 0L, //num_replicas 0L, //numRows @@ -580,8 +527,8 @@ public Object get(String name) verifyRow( rows.get(7), - "wikipedia3_2018-08-07T23:00:00.000Z_2018-08-08T00:00:00.000Z_2018-08-07T23:00:00.059Z", - 53527L, + "wikipedia3_2009-01-01T00:00:00.000Z_2010-01-01T00:00:00.000Z_version3", + 47000L, 0L, //partition_num 0L, //num_replicas 0L, //numRows @@ -736,11 +683,11 @@ public Object get(String name) Object[] row3 = rows.get(3); Assert.assertEquals("server2:1234", row3[0]); - Assert.assertEquals("test4_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version4", row3[1].toString()); + Assert.assertEquals("test4_2014-01-01T00:00:00.000Z_2015-01-01T00:00:00.000Z_version4", row3[1].toString()); Object[] row4 = rows.get(4); Assert.assertEquals("server2:1234", row4[0]); - Assert.assertEquals("test5_2017-01-01T00:00:00.000Z_2018-01-01T00:00:00.000Z_version5", row4[1].toString()); + Assert.assertEquals("test5_2015-01-01T00:00:00.000Z_2016-01-01T00:00:00.000Z_version5", row4[1].toString()); // Verify value types. verifyTypes(rows, SystemSchema.SERVER_SEGMENTS_SIGNATURE); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 0eb0d2ba7c26..8c11bb9cfcc0 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -31,6 +31,8 @@ import com.google.inject.Key; import com.google.inject.Module; import org.apache.curator.x.discovery.ServiceProvider; +import org.apache.druid.client.BrokerSegmentWatcherConfig; +import org.apache.druid.client.MetadataSegmentView; import org.apache.druid.collections.CloseableStupidPool; import org.apache.druid.curator.discovery.ServerDiscoverySelector; import org.apache.druid.data.input.InputRow; @@ -100,6 +102,7 @@ import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory; import org.apache.druid.server.QueryLifecycleFactory; +import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.server.log.NoopRequestLogger; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AllowAllAuthenticator; @@ -623,6 +626,12 @@ public static SystemSchema createMockSystemSchema( }; final SystemSchema schema = new SystemSchema( druidSchema, + new MetadataSegmentView( + druidLeaderClient, + getJsonMapper(), + new BytesAccumulatingResponseHandler(), + new BrokerSegmentWatcherConfig() + ), new TestServerInventoryView(walker.getSegments()), TEST_AUTHORIZER_MAPPER, druidLeaderClient, From 6ec4911aa9f8592c07bbb4ad9a92f481245b5a23 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Sat, 26 Jan 2019 21:46:15 -0800 Subject: [PATCH 02/26] Change the DataSegment interner so it's not based on DataSEgment's equals only and size is preserved if set * Added a trueEquals to DataSegment class --- .../apache/druid/timeline/DataSegment.java | 12 +++++++ .../druid/client/DataSegmentInterner.java | 31 ++++++++++++++++--- .../druid/client/MetadataSegmentView.java | 9 +++--- .../druid/client/selector/ServerSelector.java | 6 ++-- 4 files changed, 47 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/druid/timeline/DataSegment.java b/core/src/main/java/org/apache/druid/timeline/DataSegment.java index ee75ec122e70..2ff37528e404 100644 --- a/core/src/main/java/org/apache/druid/timeline/DataSegment.java +++ b/core/src/main/java/org/apache/druid/timeline/DataSegment.java @@ -323,6 +323,18 @@ public String toString() '}'; } + public boolean trueEquals(Object o) + { + if (!this.equals(o)) { + return false; + } + DataSegment that = (DataSegment) o; + return size == that.size && + dimensions.equals(that.dimensions) && + metrics.equals(that.metrics) && + loadSpec.equals(that.loadSpec); + } + public static Comparator bucketMonthComparator() { return new Comparator() diff --git a/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java b/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java index f53c802bdb16..679c38be08ed 100644 --- a/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java +++ b/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java @@ -19,17 +19,40 @@ package org.apache.druid.client; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; public class DataSegmentInterner { - public static final Interner INTERNER = Interners.newWeakInterner(); - private DataSegmentInterner() + private final ConcurrentMap knownSegments = new ConcurrentHashMap<>(); + + /** + * This method will return a reference to already existing segment, if an equal and better segment exists. + * A segment is considered better if it has more attributes set. A segment gets updated when + * it moves from realtime to historical, it learns what its `size` and `dimensions`, + * so size is a good indicator, if a segment has more attributes set. + */ + public DataSegment replaceWithBetterSegmentIfPresent(DataSegment segment) { + final DataSegment alreadyExistingSegment = knownSegments.get(segment.getId()); + if (alreadyExistingSegment == null) { + knownSegments.put(segment.getId(), segment); + return segment; + } + if (alreadyExistingSegment.trueEquals(segment)) { + return alreadyExistingSegment; + } else { + if (alreadyExistingSegment.equals(segment) && alreadyExistingSegment.getSize() > segment.getSize()) { + return alreadyExistingSegment; + } else { + return segment; + } + } } } diff --git a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java index e5d2de238165..f2957ef678a1 100644 --- a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java +++ b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Interner; import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; import org.apache.druid.client.coordinator.Coordinator; @@ -52,7 +51,6 @@ public class MetadataSegmentView { - private static final Interner DATASEGMENT_INTERNER = DataSegmentInterner.INTERNER; private static final int DEFAULT_POLL_PERIOD_IN_MS = 60000; private static final EmittingLogger log = new EmittingLogger(MetadataSegmentView.class); @@ -63,6 +61,7 @@ public class MetadataSegmentView private final Map publishedSegments = new ConcurrentHashMap<>(); private ScheduledExecutorService scheduledExec; + private final DataSegmentInterner interner; @Inject public MetadataSegmentView( @@ -76,6 +75,7 @@ public MetadataSegmentView( this.jsonMapper = jsonMapper; this.responseHandler = responseHandler; this.segmentWatcherConfig = segmentWatcherConfig; + this.interner = new DataSegmentInterner(); } @LifecycleStart @@ -99,7 +99,7 @@ public void stop() private void poll() { - log.debug("Start polling published segments from coordinator"); + log.info("Start polling published segments from coordinator"); //get authorized published segments from coordinator final JsonParserIterator metadataSegments = getMetadataSegments( coordinatorDruidLeaderClient, @@ -109,7 +109,7 @@ private void poll() final DateTime ts = DateTimes.nowUtc(); while (metadataSegments.hasNext()) { - final DataSegment interned = DATASEGMENT_INTERNER.intern(metadataSegments.next()); + final DataSegment interned = interner.replaceWithBetterSegmentIfPresent(metadataSegments.next()); publishedSegments.put(interned, ts); } // filter the segments from cache which may not be present in subsequent polling @@ -123,6 +123,7 @@ private void poll() publishedSegments.keySet() .removeIf(key -> !segmentWatcherConfig.getWatchedDataSources().contains(key.getDataSource())); } + log.info("Done polling published segments"); } public Iterator getPublishedSegments() diff --git a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java index 0e3d0bac3589..9b2a74e9965f 100644 --- a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java +++ b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java @@ -19,7 +19,6 @@ package org.apache.druid.client.selector; -import com.google.common.collect.Interner; import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; import org.apache.druid.client.DataSegmentInterner; import org.apache.druid.server.coordination.DruidServerMetadata; @@ -39,7 +38,6 @@ public class ServerSelector implements DiscoverySelector { - private static final Interner DATASEGMENT_INTERNER = DataSegmentInterner.INTERNER; private final Int2ObjectRBTreeMap> historicalServers; private final Int2ObjectRBTreeMap> realtimeServers; @@ -47,13 +45,15 @@ public class ServerSelector implements DiscoverySelector private final TierSelectorStrategy strategy; private final AtomicReference segment; + private final DataSegmentInterner interner; public ServerSelector( DataSegment segment, TierSelectorStrategy strategy ) { - this.segment = new AtomicReference<>(DATASEGMENT_INTERNER.intern(segment)); + this.interner = new DataSegmentInterner(); + this.segment = new AtomicReference<>(interner.replaceWithBetterSegmentIfPresent(segment)); this.strategy = strategy; this.historicalServers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); this.realtimeServers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); From 281600d2e2abe59c897e3d6e386e32105e9d8b4e Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 28 Jan 2019 13:18:48 -0800 Subject: [PATCH 03/26] Use separate interner for realtime and historical segments --- .../druid/client/DataSegmentInterner.java | 32 +++---------------- .../druid/client/MetadataSegmentView.java | 10 ++++-- .../druid/client/selector/ServerSelector.java | 6 ++-- 3 files changed, 15 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java b/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java index 679c38be08ed..5e90c67d353a 100644 --- a/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java +++ b/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java @@ -19,40 +19,18 @@ package org.apache.druid.client; +import com.google.common.collect.Interner; +import com.google.common.collect.Interners; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.SegmentId; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; public class DataSegmentInterner { + public static final Interner REALTIME_INTERNER = Interners.newWeakInterner(); + public static final Interner HISTORICAL_INTERNER = Interners.newWeakInterner(); - private final ConcurrentMap knownSegments = new ConcurrentHashMap<>(); - - /** - * This method will return a reference to already existing segment, if an equal and better segment exists. - * A segment is considered better if it has more attributes set. A segment gets updated when - * it moves from realtime to historical, it learns what its `size` and `dimensions`, - * so size is a good indicator, if a segment has more attributes set. - */ - public DataSegment replaceWithBetterSegmentIfPresent(DataSegment segment) + private DataSegmentInterner() { - final DataSegment alreadyExistingSegment = knownSegments.get(segment.getId()); - if (alreadyExistingSegment == null) { - knownSegments.put(segment.getId(), segment); - return segment; - } - if (alreadyExistingSegment.trueEquals(segment)) { - return alreadyExistingSegment; - } else { - if (alreadyExistingSegment.equals(segment) && alreadyExistingSegment.getSize() > segment.getSize()) { - return alreadyExistingSegment; - } else { - return segment; - } - } } } diff --git a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java index f2957ef678a1..5ad4f699450e 100644 --- a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java +++ b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java @@ -61,7 +61,6 @@ public class MetadataSegmentView private final Map publishedSegments = new ConcurrentHashMap<>(); private ScheduledExecutorService scheduledExec; - private final DataSegmentInterner interner; @Inject public MetadataSegmentView( @@ -75,7 +74,6 @@ public MetadataSegmentView( this.jsonMapper = jsonMapper; this.responseHandler = responseHandler; this.segmentWatcherConfig = segmentWatcherConfig; - this.interner = new DataSegmentInterner(); } @LifecycleStart @@ -109,7 +107,13 @@ private void poll() final DateTime ts = DateTimes.nowUtc(); while (metadataSegments.hasNext()) { - final DataSegment interned = interner.replaceWithBetterSegmentIfPresent(metadataSegments.next()); + final DataSegment currentSegment = metadataSegments.next(); + final DataSegment interned; + if (currentSegment.getSize() > 0) { + interned = DataSegmentInterner.HISTORICAL_INTERNER.intern(currentSegment); + } else { + interned = DataSegmentInterner.REALTIME_INTERNER.intern(currentSegment); + } publishedSegments.put(interned, ts); } // filter the segments from cache which may not be present in subsequent polling diff --git a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java index 9b2a74e9965f..823255ef974c 100644 --- a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java +++ b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java @@ -45,15 +45,15 @@ public class ServerSelector implements DiscoverySelector private final TierSelectorStrategy strategy; private final AtomicReference segment; - private final DataSegmentInterner interner; public ServerSelector( DataSegment segment, TierSelectorStrategy strategy ) { - this.interner = new DataSegmentInterner(); - this.segment = new AtomicReference<>(interner.replaceWithBetterSegmentIfPresent(segment)); + this.segment = new AtomicReference<>(segment.getSize() > 0 + ? DataSegmentInterner.HISTORICAL_INTERNER.intern(segment) + : DataSegmentInterner.REALTIME_INTERNER.intern(segment)); this.strategy = strategy; this.historicalServers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); this.realtimeServers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); From 8fed80a4c913fcd2be1b74b91369dad9fe76a997 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 28 Jan 2019 13:25:13 -0800 Subject: [PATCH 04/26] Remove trueEquals as it's not used anymore, change log message --- .../java/org/apache/druid/timeline/DataSegment.java | 12 ------------ .../org/apache/druid/client/MetadataSegmentView.java | 3 +-- 2 files changed, 1 insertion(+), 14 deletions(-) diff --git a/core/src/main/java/org/apache/druid/timeline/DataSegment.java b/core/src/main/java/org/apache/druid/timeline/DataSegment.java index 2ff37528e404..ee75ec122e70 100644 --- a/core/src/main/java/org/apache/druid/timeline/DataSegment.java +++ b/core/src/main/java/org/apache/druid/timeline/DataSegment.java @@ -323,18 +323,6 @@ public String toString() '}'; } - public boolean trueEquals(Object o) - { - if (!this.equals(o)) { - return false; - } - DataSegment that = (DataSegment) o; - return size == that.size && - dimensions.equals(that.dimensions) && - metrics.equals(that.metrics) && - loadSpec.equals(that.loadSpec); - } - public static Comparator bucketMonthComparator() { return new Comparator() diff --git a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java index 5ad4f699450e..80c96e1eaf61 100644 --- a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java +++ b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java @@ -97,7 +97,7 @@ public void stop() private void poll() { - log.info("Start polling published segments from coordinator"); + log.info("polling published segments from coordinator"); //get authorized published segments from coordinator final JsonParserIterator metadataSegments = getMetadataSegments( coordinatorDruidLeaderClient, @@ -127,7 +127,6 @@ private void poll() publishedSegments.keySet() .removeIf(key -> !segmentWatcherConfig.getWatchedDataSources().contains(key.getDataSource())); } - log.info("Done polling published segments"); } public Iterator getPublishedSegments() From 0d0f89fc960e13118b32235c738e1bbe70adbbdc Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 28 Jan 2019 16:22:44 -0800 Subject: [PATCH 05/26] PR comments --- .../druid/client/DataSegmentInterner.java | 9 +++++-- .../druid/client/MetadataSegmentView.java | 27 +++++++++---------- .../druid/client/selector/ServerSelector.java | 4 +-- .../sql/calcite/schema/SystemSchema.java | 1 - 4 files changed, 21 insertions(+), 20 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java b/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java index 5e90c67d353a..8280b26d0bff 100644 --- a/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java +++ b/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java @@ -25,12 +25,17 @@ public class DataSegmentInterner { - public static final Interner REALTIME_INTERNER = Interners.newWeakInterner(); - public static final Interner HISTORICAL_INTERNER = Interners.newWeakInterner(); + private static final Interner REALTIME_INTERNER = Interners.newWeakInterner(); + private static final Interner HISTORICAL_INTERNER = Interners.newWeakInterner(); private DataSegmentInterner() { } + public static Interner getInterner(DataSegment segment) + { + return segment.getSize() > 0 ? DataSegmentInterner.HISTORICAL_INTERNER : DataSegmentInterner.REALTIME_INTERNER; + } + } diff --git a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java index 80c96e1eaf61..2421a3ded088 100644 --- a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java +++ b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java @@ -32,7 +32,7 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; -import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.timeline.DataSegment; @@ -42,8 +42,8 @@ import java.io.IOException; import java.io.InputStream; import java.util.Iterator; -import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -52,14 +52,14 @@ public class MetadataSegmentView { private static final int DEFAULT_POLL_PERIOD_IN_MS = 60000; - private static final EmittingLogger log = new EmittingLogger(MetadataSegmentView.class); + private static final Logger log = new Logger(MetadataSegmentView.class); private final DruidLeaderClient coordinatorDruidLeaderClient; private final ObjectMapper jsonMapper; private final BytesAccumulatingResponseHandler responseHandler; private final BrokerSegmentWatcherConfig segmentWatcherConfig; - private final Map publishedSegments = new ConcurrentHashMap<>(); + private final ConcurrentMap publishedSegments = new ConcurrentHashMap<>(); private ScheduledExecutorService scheduledExec; @Inject @@ -105,19 +105,18 @@ private void poll() responseHandler ); - final DateTime ts = DateTimes.nowUtc(); + final DateTime timestamp = DateTimes.nowUtc(); while (metadataSegments.hasNext()) { final DataSegment currentSegment = metadataSegments.next(); - final DataSegment interned; - if (currentSegment.getSize() > 0) { - interned = DataSegmentInterner.HISTORICAL_INTERNER.intern(currentSegment); - } else { - interned = DataSegmentInterner.REALTIME_INTERNER.intern(currentSegment); - } - publishedSegments.put(interned, ts); + final DataSegment interned = DataSegmentInterner.getInterner(currentSegment).intern(currentSegment); + // timestamp is used to filter deleted segments + publishedSegments.put(interned, timestamp); } - // filter the segments from cache which may not be present in subsequent polling - publishedSegments.values().removeIf(v -> v != ts); + // filter the segments from cache whose timestamp is not equal to latest timestamp stored, + // since the presence of a segment with an earlier timestamp indicates that + // "that" segment is not returned by coordinator in latest poll, so it's + // likely deleted and therefore we remove it from publishedSegments + publishedSegments.values().removeIf(v -> v != timestamp); if (segmentWatcherConfig.getWatchedDataSources() != null) { log.debug( diff --git a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java index 823255ef974c..84b7e4431fda 100644 --- a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java +++ b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java @@ -51,9 +51,7 @@ public ServerSelector( TierSelectorStrategy strategy ) { - this.segment = new AtomicReference<>(segment.getSize() > 0 - ? DataSegmentInterner.HISTORICAL_INTERNER.intern(segment) - : DataSegmentInterner.REALTIME_INTERNER.intern(segment)); + this.segment = new AtomicReference<>(DataSegmentInterner.getInterner(segment).intern(segment)); this.strategy = strategy; this.historicalServers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); this.realtimeServers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index d07e3c62a8f2..c7bce94fd298 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -85,7 +85,6 @@ import java.util.Objects; import java.util.Set; - public class SystemSchema extends AbstractSchema { public static final String NAME = "sys"; From 8fc84b32ae827ca0c24c3b24083fb71fbd3ee3f0 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 29 Jan 2019 09:19:25 -0800 Subject: [PATCH 06/26] PR comments --- .../druid/client/DataSegmentInterner.java | 4 +-- .../druid/client/MetadataSegmentView.java | 35 +++++++++++++------ .../druid/client/selector/ServerSelector.java | 2 +- .../druid/server/http/MetadataResource.java | 23 ++++++++++-- .../sql/calcite/schema/SystemSchema.java | 27 ++++++++++++-- 5 files changed, 74 insertions(+), 17 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java b/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java index 8280b26d0bff..c507ce681e16 100644 --- a/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java +++ b/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java @@ -33,9 +33,9 @@ private DataSegmentInterner() } - public static Interner getInterner(DataSegment segment) + public static DataSegment intern(DataSegment segment) { - return segment.getSize() > 0 ? DataSegmentInterner.HISTORICAL_INTERNER : DataSegmentInterner.REALTIME_INTERNER; + return segment.getSize() > 0 ? HISTORICAL_INTERNER.intern(segment) : REALTIME_INTERNER.intern(segment); } } diff --git a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java index 2421a3ded088..69e75bd933c5 100644 --- a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java +++ b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java @@ -19,6 +19,7 @@ package org.apache.druid.client; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; @@ -32,7 +33,7 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; -import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.timeline.DataSegment; @@ -42,6 +43,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Iterator; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; @@ -52,7 +54,7 @@ public class MetadataSegmentView { private static final int DEFAULT_POLL_PERIOD_IN_MS = 60000; - private static final Logger log = new Logger(MetadataSegmentView.class); + private static final EmittingLogger log = new EmittingLogger(MetadataSegmentView.class); private final DruidLeaderClient coordinatorDruidLeaderClient; private final ObjectMapper jsonMapper; @@ -81,7 +83,14 @@ public void start() { scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); scheduledExec.scheduleWithFixedDelay( - () -> poll(), + () -> { + try { + poll(); + } + catch (JsonProcessingException e) { + log.makeAlert(e, "Problem polling Coordinator.").emit(); + } + }, 0, DEFAULT_POLL_PERIOD_IN_MS, TimeUnit.MILLISECONDS @@ -95,20 +104,20 @@ public void stop() scheduledExec = null; } - private void poll() + private void poll() throws JsonProcessingException { log.info("polling published segments from coordinator"); //get authorized published segments from coordinator final JsonParserIterator metadataSegments = getMetadataSegments( coordinatorDruidLeaderClient, jsonMapper, - responseHandler + responseHandler, + segmentWatcherConfig.getWatchedDataSources() ); final DateTime timestamp = DateTimes.nowUtc(); while (metadataSegments.hasNext()) { - final DataSegment currentSegment = metadataSegments.next(); - final DataSegment interned = DataSegmentInterner.getInterner(currentSegment).intern(currentSegment); + final DataSegment interned = DataSegmentInterner.intern(metadataSegments.next()); // timestamp is used to filter deleted segments publishedSegments.put(interned, timestamp); } @@ -137,14 +146,20 @@ public Iterator getPublishedSegments() private static JsonParserIterator getMetadataSegments( DruidLeaderClient coordinatorClient, ObjectMapper jsonMapper, - BytesAccumulatingResponseHandler responseHandler - ) + BytesAccumulatingResponseHandler responseHandler, + Set watchedDataSources + ) throws JsonProcessingException { + String query = "/druid/coordinator/v1/metadata/segments"; + if (watchedDataSources != null && !watchedDataSources.isEmpty()) { + final String datasourcesJson = jsonMapper.writeValueAsString(watchedDataSources); + query = "/druid/coordinator/v1/metadata/segments?" + datasourcesJson; + } Request request; try { request = coordinatorClient.makeRequest( HttpMethod.GET, - StringUtils.format("/druid/coordinator/v1/metadata/segments"), + StringUtils.format(query), false ); } diff --git a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java index 84b7e4431fda..a485dbaa955c 100644 --- a/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java +++ b/server/src/main/java/org/apache/druid/client/selector/ServerSelector.java @@ -51,7 +51,7 @@ public ServerSelector( TierSelectorStrategy strategy ) { - this.segment = new AtomicReference<>(DataSegmentInterner.getInterner(segment).intern(segment)); + this.segment = new AtomicReference<>(DataSegmentInterner.intern(segment)); this.strategy = strategy; this.historicalServers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); this.realtimeServers = new Int2ObjectRBTreeMap<>(strategy.getComparator()); diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index d1e83a91f4ea..ce7f8b1e1e64 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.collect.Collections2; @@ -52,11 +53,13 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.TreeSet; +import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -148,9 +151,25 @@ public Response getDatabaseSegmentDataSource(@PathParam("dataSourceName") final @GET @Path("/segments") @Produces(MediaType.APPLICATION_JSON) - public Response getDatabaseSegments(@Context final HttpServletRequest req) + public Response getDatabaseSegments( + @Context final HttpServletRequest req, + @QueryParam("datasources") String datasources + ) + throws IOException { - final Collection druidDataSources = metadataSegmentManager.getDataSources(); + Collection druidDataSources = metadataSegmentManager.getDataSources(); + if (datasources != null && !datasources.isEmpty()) { + Set watchedDatasources = jsonMapper.readValue( + datasources, + jsonMapper.getTypeFactory() + .constructType(new TypeReference>() + { + }) + ); + druidDataSources = druidDataSources.stream() + .filter(src -> watchedDatasources.contains(src.getName())) + .collect(Collectors.toSet()); + } final Stream metadataSegments = druidDataSources .stream() .flatMap(t -> t.getSegments().stream()); diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index c7bce94fd298..969bc46456c0 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -230,12 +230,15 @@ public Enumerable scan(DataContext root) } //get published segments from metadata segment cache - final Iterator pubSegments = metadataView.getPublishedSegments(); + final Iterator metadataSegments = metadataView.getPublishedSegments(); final Set segmentsAlreadySeen = new HashSet<>(); final FluentIterable publishedSegments = FluentIterable - .from(() -> pubSegments) + .from(() -> getAuthorizedPublishedSegments( + metadataSegments, + root + )) .transform(val -> { try { segmentsAlreadySeen.add(val.getId()); @@ -309,6 +312,26 @@ public Enumerable scan(DataContext root) } + private Iterator getAuthorizedPublishedSegments( + Iterator it, + DataContext root + ) + { + final AuthenticationResult authenticationResult = + (AuthenticationResult) root.get(PlannerContext.DATA_CTX_AUTHENTICATION_RESULT); + + Function> raGenerator = segment -> Collections.singletonList( + AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); + + final Iterable authorizedSegments = AuthorizationUtils.filterAuthorizedResources( + authenticationResult, + () -> it, + raGenerator, + authorizerMapper + ); + return authorizedSegments.iterator(); + } + private Iterator> getAuthorizedAvailableSegments( Iterator> availableSegmentEntries, DataContext root From d46edc5a1d8dcdd94c8fea1cd26622d6158ce99a Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 29 Jan 2019 12:14:54 -0800 Subject: [PATCH 07/26] Fix tests --- .../org/apache/druid/client/CachingClusteredClientTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index 2eff321d78f0..9d6807f32647 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -2213,13 +2213,13 @@ private List> populateTimeline( expectedResults.get(k).get(j) ); serverExpectations.get(lastServer).addExpectation(expectation); - + EasyMock.expect(mockSegment.getSize()).andReturn(-1L).anyTimes(); + EasyMock.replay(mockSegment); ServerSelector selector = new ServerSelector( expectation.getSegment(), new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) ); selector.addServerAndUpdateSegment(new QueryableDruidServer(lastServer, null), selector.getSegment()); - final ShardSpec shardSpec; if (numChunks == 1) { shardSpec = new SingleDimensionShardSpec("dimAll", null, null, 0); @@ -2234,6 +2234,7 @@ private List> populateTimeline( } shardSpec = new SingleDimensionShardSpec("dim" + k, start, end, j); } + EasyMock.reset(mockSegment); EasyMock.expect(mockSegment.getShardSpec()) .andReturn(shardSpec) .anyTimes(); From a9934824ee30dc454952be48535f757503f04eed Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Tue, 29 Jan 2019 23:12:52 -0800 Subject: [PATCH 08/26] PR comments --- .../java/org/apache/druid/client/MetadataSegmentView.java | 8 ++++++-- .../apache/druid/client/CachingClusteredClientTest.java | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java index 69e75bd933c5..76198c4acbf7 100644 --- a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java +++ b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java @@ -49,6 +49,10 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +/** + * This class polls the coordinator in background to keep the latest published segments. + * Provides {@link #getPublishedSegments()} for others to get segments in metadata store. + */ @ManageLifecycle public class MetadataSegmentView { @@ -67,8 +71,8 @@ public class MetadataSegmentView @Inject public MetadataSegmentView( final @Coordinator DruidLeaderClient druidLeaderClient, - ObjectMapper jsonMapper, - BytesAccumulatingResponseHandler responseHandler, + final ObjectMapper jsonMapper, + final BytesAccumulatingResponseHandler responseHandler, final BrokerSegmentWatcherConfig segmentWatcherConfig ) { diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index 9d6807f32647..4f043eeaedef 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -2213,7 +2213,7 @@ private List> populateTimeline( expectedResults.get(k).get(j) ); serverExpectations.get(lastServer).addExpectation(expectation); - EasyMock.expect(mockSegment.getSize()).andReturn(-1L).anyTimes(); + EasyMock.expect(mockSegment.getSize()).andReturn(0L).anyTimes(); EasyMock.replay(mockSegment); ServerSelector selector = new ServerSelector( expectation.getSegment(), From a4cbcfc765f3d8a5ac5c5e1c2375b0a68c6621ea Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 30 Jan 2019 13:22:39 -0800 Subject: [PATCH 09/26] Few more modification to * change the coordinator api * removeall segments at once from MetadataSegmentView in order to serve a more consistent view of published segments * Change the poll behaviour to avoid multiple poll execution at same time --- .../druid/client/MetadataSegmentView.java | 50 +++++++++++-------- .../druid/server/http/MetadataResource.java | 14 +----- 2 files changed, 31 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java index 76198c4acbf7..44dbc169bbd0 100644 --- a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java +++ b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java @@ -19,7 +19,6 @@ package org.apache.druid.client; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; @@ -33,7 +32,7 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; -import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.timeline.DataSegment; @@ -44,10 +43,12 @@ import java.io.InputStream; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** * This class polls the coordinator in background to keep the latest published segments. @@ -58,7 +59,7 @@ public class MetadataSegmentView { private static final int DEFAULT_POLL_PERIOD_IN_MS = 60000; - private static final EmittingLogger log = new EmittingLogger(MetadataSegmentView.class); + private static final Logger log = new Logger(MetadataSegmentView.class); private final DruidLeaderClient coordinatorDruidLeaderClient; private final ObjectMapper jsonMapper; @@ -86,19 +87,7 @@ public MetadataSegmentView( public void start() { scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); - scheduledExec.scheduleWithFixedDelay( - () -> { - try { - poll(); - } - catch (JsonProcessingException e) { - log.makeAlert(e, "Problem polling Coordinator.").emit(); - } - }, - 0, - DEFAULT_POLL_PERIOD_IN_MS, - TimeUnit.MILLISECONDS - ); + scheduledExec.schedule(new Poll(), 0, TimeUnit.MILLISECONDS); } @LifecycleStop @@ -108,7 +97,18 @@ public void stop() scheduledExec = null; } - private void poll() throws JsonProcessingException + private class Poll implements Callable + { + @Override + public Void call() + { + poll(); + scheduledExec.schedule(new Poll(), DEFAULT_POLL_PERIOD_IN_MS, TimeUnit.MILLISECONDS); + return null; + } + } + + private void poll() { log.info("polling published segments from coordinator"); //get authorized published segments from coordinator @@ -129,7 +129,11 @@ private void poll() throws JsonProcessingException // since the presence of a segment with an earlier timestamp indicates that // "that" segment is not returned by coordinator in latest poll, so it's // likely deleted and therefore we remove it from publishedSegments - publishedSegments.values().removeIf(v -> v != timestamp); + Set toBeRemovedSegments = publishedSegments.values() + .stream() + .filter(v -> v != timestamp) + .collect(Collectors.toSet()); + publishedSegments.values().removeAll(toBeRemovedSegments); if (segmentWatcherConfig.getWatchedDataSources() != null) { log.debug( @@ -152,12 +156,16 @@ private static JsonParserIterator getMetadataSegments( ObjectMapper jsonMapper, BytesAccumulatingResponseHandler responseHandler, Set watchedDataSources - ) throws JsonProcessingException + ) { String query = "/druid/coordinator/v1/metadata/segments"; if (watchedDataSources != null && !watchedDataSources.isEmpty()) { - final String datasourcesJson = jsonMapper.writeValueAsString(watchedDataSources); - query = "/druid/coordinator/v1/metadata/segments?" + datasourcesJson; + final StringBuilder sb = new StringBuilder(); + for (String ds : watchedDataSources) { + sb.append("datasources=" + ds + "&"); + } + sb.setLength(Math.max(sb.length() - 1, 0)); + query = "/druid/coordinator/v1/metadata/segments?" + sb; } Request request; try { diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index ce7f8b1e1e64..150648c06e29 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonGenerator; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.collect.Collections2; @@ -53,7 +52,6 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; -import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -153,21 +151,13 @@ public Response getDatabaseSegmentDataSource(@PathParam("dataSourceName") final @Produces(MediaType.APPLICATION_JSON) public Response getDatabaseSegments( @Context final HttpServletRequest req, - @QueryParam("datasources") String datasources + @QueryParam("datasources") final Set datasources ) - throws IOException { Collection druidDataSources = metadataSegmentManager.getDataSources(); if (datasources != null && !datasources.isEmpty()) { - Set watchedDatasources = jsonMapper.readValue( - datasources, - jsonMapper.getTypeFactory() - .constructType(new TypeReference>() - { - }) - ); druidDataSources = druidDataSources.stream() - .filter(src -> watchedDatasources.contains(src.getName())) + .filter(src -> datasources.contains(src.getName())) .collect(Collectors.toSet()); } final Stream metadataSegments = druidDataSources From 0fbb48c40fb0f4fb426d1cec12efe67da7404b5c Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 30 Jan 2019 14:50:48 -0800 Subject: [PATCH 10/26] minor changes --- .../druid/client/MetadataSegmentView.java | 26 +++++++++---------- .../druid/server/http/MetadataResource.java | 2 +- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java index 44dbc169bbd0..5916ad6c8b1c 100644 --- a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java +++ b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java @@ -87,7 +87,7 @@ public MetadataSegmentView( public void start() { scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); - scheduledExec.schedule(new Poll(), 0, TimeUnit.MILLISECONDS); + scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS); } @LifecycleStop @@ -97,17 +97,6 @@ public void stop() scheduledExec = null; } - private class Poll implements Callable - { - @Override - public Void call() - { - poll(); - scheduledExec.schedule(new Poll(), DEFAULT_POLL_PERIOD_IN_MS, TimeUnit.MILLISECONDS); - return null; - } - } - private void poll() { log.info("polling published segments from coordinator"); @@ -129,7 +118,7 @@ private void poll() // since the presence of a segment with an earlier timestamp indicates that // "that" segment is not returned by coordinator in latest poll, so it's // likely deleted and therefore we remove it from publishedSegments - Set toBeRemovedSegments = publishedSegments.values() + final Set toBeRemovedSegments = publishedSegments.values() .stream() .filter(v -> v != timestamp) .collect(Collectors.toSet()); @@ -197,4 +186,15 @@ private static JsonParserIterator getMetadataSegments( ); } + private class PollTask implements Callable + { + @Override + public Void call() + { + poll(); + scheduledExec.schedule(new PollTask(), DEFAULT_POLL_PERIOD_IN_MS, TimeUnit.MILLISECONDS); + return null; + } + } + } diff --git a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java index 150648c06e29..c7e270214ff4 100644 --- a/server/src/main/java/org/apache/druid/server/http/MetadataResource.java +++ b/server/src/main/java/org/apache/druid/server/http/MetadataResource.java @@ -164,7 +164,7 @@ public Response getDatabaseSegments( .stream() .flatMap(t -> t.getSegments().stream()); - Function> raGenerator = segment -> Collections.singletonList( + final Function> raGenerator = segment -> Collections.singletonList( AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR.apply(segment.getDataSource())); final Iterable authorizedSegments = From 8df2d9670431c33ea892d2b49815c76c951194f3 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 30 Jan 2019 17:16:44 -0800 Subject: [PATCH 11/26] PR comments --- .../druid/client/DataSegmentInterner.java | 9 ++++ .../druid/client/MetadataSegmentView.java | 52 ++++++++++--------- 2 files changed, 36 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java b/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java index c507ce681e16..e2e9608874c9 100644 --- a/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java +++ b/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java @@ -23,6 +23,12 @@ import com.google.common.collect.Interners; import org.apache.druid.timeline.DataSegment; +/** + * Interns the DataSegment object in order to share the reference for same DataSegment. + * It uses two separate interners for realtime and historical segments to prevent + * overwriting the size of a segment which was served by a historical and later served + * by another realtime server, since realtime server always publishes with size 0. + */ public class DataSegmentInterner { private static final Interner REALTIME_INTERNER = Interners.newWeakInterner(); @@ -35,6 +41,9 @@ private DataSegmentInterner() public static DataSegment intern(DataSegment segment) { + // A segment learns it's size and dimensions when it moves from a relatime to historical server + // for that reason, we are using it's size as the indicator to decide whether to use REALTIME or + // HISTORICAL interner. return segment.getSize() > 0 ? HISTORICAL_INTERNER.intern(segment) : REALTIME_INTERNER.intern(segment); } diff --git a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java index 5916ad6c8b1c..4e23b047d8ba 100644 --- a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java +++ b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java @@ -32,7 +32,7 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; -import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; import org.apache.druid.timeline.DataSegment; @@ -43,12 +43,10 @@ import java.io.InputStream; import java.util.Iterator; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** * This class polls the coordinator in background to keep the latest published segments. @@ -58,15 +56,15 @@ public class MetadataSegmentView { - private static final int DEFAULT_POLL_PERIOD_IN_MS = 60000; - private static final Logger log = new Logger(MetadataSegmentView.class); + private static final long POLL_PERIOD_IN_MS = 60000; + private static final EmittingLogger log = new EmittingLogger(MetadataSegmentView.class); private final DruidLeaderClient coordinatorDruidLeaderClient; private final ObjectMapper jsonMapper; private final BytesAccumulatingResponseHandler responseHandler; private final BrokerSegmentWatcherConfig segmentWatcherConfig; - private final ConcurrentMap publishedSegments = new ConcurrentHashMap<>(); + private final ConcurrentMap publishedSegments = new ConcurrentHashMap<>(1000); private ScheduledExecutorService scheduledExec; @Inject @@ -118,20 +116,8 @@ private void poll() // since the presence of a segment with an earlier timestamp indicates that // "that" segment is not returned by coordinator in latest poll, so it's // likely deleted and therefore we remove it from publishedSegments - final Set toBeRemovedSegments = publishedSegments.values() - .stream() - .filter(v -> v != timestamp) - .collect(Collectors.toSet()); - publishedSegments.values().removeAll(toBeRemovedSegments); + publishedSegments.entrySet().removeIf(e -> e.getValue() != timestamp); - if (segmentWatcherConfig.getWatchedDataSources() != null) { - log.debug( - "filtering datasources[%s] in published segments based on broker's watchedDataSources", - segmentWatcherConfig.getWatchedDataSources() - ); - publishedSegments.keySet() - .removeIf(key -> !segmentWatcherConfig.getWatchedDataSources().contains(key.getDataSource())); - } } public Iterator getPublishedSegments() @@ -149,11 +135,13 @@ private static JsonParserIterator getMetadataSegments( { String query = "/druid/coordinator/v1/metadata/segments"; if (watchedDataSources != null && !watchedDataSources.isEmpty()) { + log.debug( + "filtering datasources in published segments based on broker's watchedDataSources[%s]", watchedDataSources); final StringBuilder sb = new StringBuilder(); for (String ds : watchedDataSources) { sb.append("datasources=" + ds + "&"); } - sb.setLength(Math.max(sb.length() - 1, 0)); + sb.setLength(sb.length() - 1); query = "/druid/coordinator/v1/metadata/segments?" + sb; } Request request; @@ -186,14 +174,28 @@ private static JsonParserIterator getMetadataSegments( ); } - private class PollTask implements Callable + private class PollTask implements Runnable { @Override - public Void call() + public void run() { - poll(); - scheduledExec.schedule(new PollTask(), DEFAULT_POLL_PERIOD_IN_MS, TimeUnit.MILLISECONDS); - return null; + long delayMS = POLL_PERIOD_IN_MS; + try { + long pollStartTime = System.nanoTime(); + poll(); + long pollEndTime = System.nanoTime(); + final long pollTimeNs = pollEndTime - pollStartTime; + final long pollTimeMs = TimeUnit.NANOSECONDS.toMillis(pollTimeNs); + if (pollTimeMs > POLL_PERIOD_IN_MS) { + delayMS = 0; + } else { + delayMS = pollTimeMs; + } + } + catch (Exception e) { + log.makeAlert(e, "Problem polling Coordinator.").emit(); + } + scheduledExec.schedule(new PollTask(), delayMS, TimeUnit.MILLISECONDS); } } From d5b7d79373841888cdb276f22403681e489033f1 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Wed, 30 Jan 2019 19:59:23 -0800 Subject: [PATCH 12/26] PR comments --- .../druid/client/MetadataSegmentView.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java index 4e23b047d8ba..31cd67c4499e 100644 --- a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java +++ b/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java @@ -98,7 +98,6 @@ public void stop() private void poll() { log.info("polling published segments from coordinator"); - //get authorized published segments from coordinator final JsonParserIterator metadataSegments = getMetadataSegments( coordinatorDruidLeaderClient, jsonMapper, @@ -116,6 +115,11 @@ private void poll() // since the presence of a segment with an earlier timestamp indicates that // "that" segment is not returned by coordinator in latest poll, so it's // likely deleted and therefore we remove it from publishedSegments + // Since segments are not atomically replaced because it can cause high + // memory footprint due to large number of published segments, so + // we are incrementally removing deleted segments from the map + // This means publishedSegments will be eventually consistent with + // the segments in coordinator publishedSegments.entrySet().removeIf(e -> e.getValue() != timestamp); } @@ -139,7 +143,7 @@ private static JsonParserIterator getMetadataSegments( "filtering datasources in published segments based on broker's watchedDataSources[%s]", watchedDataSources); final StringBuilder sb = new StringBuilder(); for (String ds : watchedDataSources) { - sb.append("datasources=" + ds + "&"); + sb.append("datasources=").append(ds).append("&"); } sb.setLength(sb.length() - 1); query = "/druid/coordinator/v1/metadata/segments?" + sb; @@ -181,16 +185,12 @@ public void run() { long delayMS = POLL_PERIOD_IN_MS; try { - long pollStartTime = System.nanoTime(); + final long pollStartTime = System.nanoTime(); poll(); - long pollEndTime = System.nanoTime(); - final long pollTimeNs = pollEndTime - pollStartTime; - final long pollTimeMs = TimeUnit.NANOSECONDS.toMillis(pollTimeNs); - if (pollTimeMs > POLL_PERIOD_IN_MS) { - delayMS = 0; - } else { - delayMS = pollTimeMs; - } + final long pollEndTime = System.nanoTime(); + final long pollTimeNS = pollEndTime - pollStartTime; + final long pollTimeMS = TimeUnit.NANOSECONDS.toMillis(pollTimeNS); + delayMS = Math.max(POLL_PERIOD_IN_MS - pollTimeMS, 0); } catch (Exception e) { log.makeAlert(e, "Problem polling Coordinator.").emit(); From 632d741e83da325c3a9a2feec4c8205b1db4b65e Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 31 Jan 2019 17:01:23 -0800 Subject: [PATCH 13/26] Make the segment cache in broker off by default * Added a config to PlannerConfig * Moved MetadataSegmentView to sql module --- .../druid/benchmark/query/SqlBenchmark.java | 2 +- .../sql/QuantileSqlAggregatorTest.java | 2 +- .../sql/calcite/planner/PlannerConfig.java | 9 ++++++ .../calcite/schema}/MetadataSegmentView.java | 29 +++++++++++++++---- .../sql/calcite/schema/SystemSchema.java | 3 +- .../sql/avatica/DruidAvaticaHandlerTest.java | 4 +-- .../druid/sql/avatica/DruidStatementTest.java | 2 +- .../sql/calcite/BaseCalciteQueryTest.java | 2 +- .../sql/calcite/http/SqlResourceTest.java | 2 +- .../sql/calcite/schema/SystemSchemaTest.java | 1 - .../druid/sql/calcite/util/CalciteTests.java | 8 +++-- 11 files changed, 46 insertions(+), 18 deletions(-) rename {server/src/main/java/org/apache/druid/client => sql/src/main/java/org/apache/druid/sql/calcite/schema}/MetadataSegmentView.java (86%) diff --git a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java index bb19188fc4e5..1f5158827532 100644 --- a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java +++ b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SqlBenchmark.java @@ -114,7 +114,7 @@ public void setup() final QueryRunnerFactoryConglomerate conglomerate = conglomerateCloserPair.lhs; final PlannerConfig plannerConfig = new PlannerConfig(); final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig); - final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig); this.walker = new SpecificSegmentsQuerySegmentWalker(conglomerate).add(dataSegment, index); final PlannerFactory plannerFactory = new PlannerFactory( druidSchema, diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java index 69337b568f60..7aa4e6733ea2 100644 --- a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java +++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java @@ -157,7 +157,7 @@ public void setUp() throws Exception final PlannerConfig plannerConfig = new PlannerConfig(); final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig); - final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig); final DruidOperatorTable operatorTable = new DruidOperatorTable( ImmutableSet.of(new QuantileSqlAggregator()), ImmutableSet.of() diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java index fe9e72ffb4be..8673538e1c07 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java @@ -66,6 +66,14 @@ public class PlannerConfig @JsonProperty private DateTimeZone sqlTimeZone = DateTimeZone.UTC; + @JsonProperty + private boolean metadataSegmentCacheEnable = false; + + public boolean isMetadataSegmentCacheEnable() + { + return metadataSegmentCacheEnable; + } + public Period getMetadataRefreshPeriod() { return metadataRefreshPeriod; @@ -151,6 +159,7 @@ public PlannerConfig withOverrides(final Map context) newConfig.requireTimeCondition = isRequireTimeCondition(); newConfig.sqlTimeZone = getSqlTimeZone(); newConfig.awaitInitializationOnStart = isAwaitInitializationOnStart(); + newConfig.metadataSegmentCacheEnable = isMetadataSegmentCacheEnable(); return newConfig; } diff --git a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java similarity index 86% rename from server/src/main/java/org/apache/druid/client/MetadataSegmentView.java rename to sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 31cd67c4499e..738c8c7bba65 100644 --- a/server/src/main/java/org/apache/druid/client/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -17,13 +17,17 @@ * under the License. */ -package org.apache.druid.client; +package org.apache.druid.sql.calcite.schema; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; +import org.apache.druid.client.BrokerSegmentWatcherConfig; +import org.apache.druid.client.DataSegmentInterner; +import org.apache.druid.client.JsonParserIterator; import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.guice.ManageLifecycle; @@ -35,6 +39,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.Request; import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler; +import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.timeline.DataSegment; import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.DateTime; @@ -66,15 +71,18 @@ public class MetadataSegmentView private final ConcurrentMap publishedSegments = new ConcurrentHashMap<>(1000); private ScheduledExecutorService scheduledExec; + final PlannerConfig plannerConfig; @Inject public MetadataSegmentView( final @Coordinator DruidLeaderClient druidLeaderClient, final ObjectMapper jsonMapper, final BytesAccumulatingResponseHandler responseHandler, - final BrokerSegmentWatcherConfig segmentWatcherConfig + final BrokerSegmentWatcherConfig segmentWatcherConfig, + final PlannerConfig plannerConfig ) { + this.plannerConfig = Preconditions.checkNotNull(plannerConfig, "plannerConfig"); this.coordinatorDruidLeaderClient = druidLeaderClient; this.jsonMapper = jsonMapper; this.responseHandler = responseHandler; @@ -84,8 +92,10 @@ public MetadataSegmentView( @LifecycleStart public void start() { - scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); - scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS); + if (plannerConfig.isMetadataSegmentCacheEnable()) { + scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); + scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS); + } } @LifecycleStop @@ -126,7 +136,16 @@ private void poll() public Iterator getPublishedSegments() { - return publishedSegments.keySet().iterator(); + if (plannerConfig.isMetadataSegmentCacheEnable()) { + return publishedSegments.keySet().iterator(); + } else { + return getMetadataSegments( + coordinatorDruidLeaderClient, + jsonMapper, + responseHandler, + segmentWatcherConfig.getWatchedDataSources() + ); + } } // Note that coordinator must be up to get segments diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 969bc46456c0..d0599f861903 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -45,7 +45,6 @@ import org.apache.calcite.schema.impl.AbstractTable; import org.apache.druid.client.ImmutableDruidServer; import org.apache.druid.client.JsonParserIterator; -import org.apache.druid.client.MetadataSegmentView; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.indexing.IndexingService; @@ -229,7 +228,7 @@ public Enumerable scan(DataContext root) partialSegmentDataMap.put(h.getSegmentId(), partialSegmentData); } - //get published segments from metadata segment cache + //get published segments from metadata segment cache (if enabled in sql planner config), else directly from coordinator final Iterator metadataSegments = metadataView.getPublishedSegments(); final Set segmentsAlreadySeen = new HashSet<>(); diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java index c1c43b5ad1ea..4f25a66c1139 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidAvaticaHandlerTest.java @@ -159,7 +159,7 @@ public void setUp() throws Exception walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder()); final PlannerConfig plannerConfig = new PlannerConfig(); final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig); - final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); @@ -790,7 +790,7 @@ public int getMaxRowsPerFrame() final PlannerConfig plannerConfig = new PlannerConfig(); final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig); - final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); final List frames = new ArrayList<>(); diff --git a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java index 74386125ef1f..9cc2ecf4cabb 100644 --- a/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java +++ b/sql/src/test/java/org/apache/druid/sql/avatica/DruidStatementTest.java @@ -88,7 +88,7 @@ public void setUp() throws Exception walker = CalciteTests.createMockWalker(conglomerate, temporaryFolder.newFolder()); final PlannerConfig plannerConfig = new PlannerConfig(); final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig); - final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); final PlannerFactory plannerFactory = new PlannerFactory( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index b5d083100c75..1795cfb5b64a 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -539,7 +539,7 @@ public List getResults( { final InProcessViewManager viewManager = new InProcessViewManager(CalciteTests.TEST_AUTHENTICATOR_ESCALATOR); final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig, viewManager); - final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig); final PlannerFactory plannerFactory = new PlannerFactory( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java index 8b1c9aae0caf..6e241a5aad93 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/http/SqlResourceTest.java @@ -114,7 +114,7 @@ public void setUp() throws Exception final PlannerConfig plannerConfig = new PlannerConfig(); final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig); - final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig); final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); req = EasyMock.createStrictMock(HttpServletRequest.class); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index b4d0743d6ea1..7d8cdaad5729 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -37,7 +37,6 @@ import org.apache.druid.client.DruidServer; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; -import org.apache.druid.client.MetadataSegmentView; import org.apache.druid.client.TimelineServerView; import org.apache.druid.data.input.InputRow; import org.apache.druid.discovery.DruidLeaderClient; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 342912211f06..5beafb7bf23c 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -32,7 +32,6 @@ import com.google.inject.Module; import org.apache.curator.x.discovery.ServiceProvider; import org.apache.druid.client.BrokerSegmentWatcherConfig; -import org.apache.druid.client.MetadataSegmentView; import org.apache.druid.collections.CloseableStupidPool; import org.apache.druid.curator.discovery.ServerDiscoverySelector; import org.apache.druid.data.input.InputRow; @@ -126,6 +125,7 @@ import org.apache.druid.sql.calcite.planner.PlannerConfig; import org.apache.druid.sql.calcite.planner.PlannerFactory; import org.apache.druid.sql.calcite.schema.DruidSchema; +import org.apache.druid.sql.calcite.schema.MetadataSegmentView; import org.apache.druid.sql.calcite.schema.SystemSchema; import org.apache.druid.sql.calcite.view.NoopViewManager; import org.apache.druid.sql.calcite.view.ViewManager; @@ -742,7 +742,8 @@ public static InputRow createRow(final Object t, final String dim1, final String public static SystemSchema createMockSystemSchema( final DruidSchema druidSchema, - final SpecificSegmentsQuerySegmentWalker walker + final SpecificSegmentsQuerySegmentWalker walker, + final PlannerConfig plannerConfig ) { final DruidLeaderClient druidLeaderClient = new DruidLeaderClient( @@ -760,7 +761,8 @@ public static SystemSchema createMockSystemSchema( druidLeaderClient, getJsonMapper(), new BytesAccumulatingResponseHandler(), - new BrokerSegmentWatcherConfig() + new BrokerSegmentWatcherConfig(), + plannerConfig ), new TestServerInventoryView(walker.getSegments()), TEST_AUTHORIZER_MAPPER, From c41f085ee44ad2e46cf09a5862d6e368ef782587 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 31 Jan 2019 17:19:17 -0800 Subject: [PATCH 14/26] Add doc for new planner config --- .../org/apache/druid/sql/calcite/planner/PlannerConfig.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java index 8673538e1c07..8ff73406ee89 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java @@ -66,6 +66,10 @@ public class PlannerConfig @JsonProperty private DateTimeZone sqlTimeZone = DateTimeZone.UTC; + /** + * If this config is set to true, broker would cache the published segments, + * which can make your sys table queries faster + */ @JsonProperty private boolean metadataSegmentCacheEnable = false; From 07a80b11d19f73b968688cd014a30351f83357f5 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Thu, 31 Jan 2019 18:36:41 -0800 Subject: [PATCH 15/26] Update documentation --- docs/content/querying/sql.md | 1 + .../org/apache/druid/sql/calcite/planner/PlannerConfig.java | 4 ---- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index 6c9674c66eca..95f6e7a54022 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -697,6 +697,7 @@ The Druid SQL server is configured through the following properties on the Broke |`druid.sql.planner.useFallback`|Whether to evaluate operations on the Broker when they cannot be expressed as Druid queries. This option is not recommended for production since it can generate unscalable query plans. If false, SQL queries that cannot be translated to Druid queries will fail.|false| |`druid.sql.planner.requireTimeCondition`|Whether to require SQL to have filter conditions on __time column so that all generated native queries will have user specified intervals. If true, all queries wihout filter condition on __time column will fail|false| |`druid.sql.planner.sqlTimeZone`|Sets the default time zone for the server, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|UTC| +|`druid.sql.planner.metadataSegmentCacheEnable`|Whether to keep a cache of published segments in broker. If true, broker polls coordinator in background to get segments from metadata store and maintains a local cache. If false, coordinator's REST api will be invoked when broker needs published segments info.|false| ## SQL Metrics diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java index 8ff73406ee89..8673538e1c07 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java @@ -66,10 +66,6 @@ public class PlannerConfig @JsonProperty private DateTimeZone sqlTimeZone = DateTimeZone.UTC; - /** - * If this config is set to true, broker would cache the published segments, - * which can make your sys table queries faster - */ @JsonProperty private boolean metadataSegmentCacheEnable = false; From a440c0cdd20faf3e3d65ec4789232ac9407b2e4b Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 1 Feb 2019 08:16:16 -0800 Subject: [PATCH 16/26] PR comments --- docs/content/querying/sql.md | 1 + .../druid/client/DataSegmentInterner.java | 3 +- .../sql/calcite/planner/PlannerConfig.java | 17 ++++++- .../calcite/schema/MetadataSegmentView.java | 46 ++++++++++++++----- 4 files changed, 52 insertions(+), 15 deletions(-) diff --git a/docs/content/querying/sql.md b/docs/content/querying/sql.md index 95f6e7a54022..36391f5a5325 100644 --- a/docs/content/querying/sql.md +++ b/docs/content/querying/sql.md @@ -698,6 +698,7 @@ The Druid SQL server is configured through the following properties on the Broke |`druid.sql.planner.requireTimeCondition`|Whether to require SQL to have filter conditions on __time column so that all generated native queries will have user specified intervals. If true, all queries wihout filter condition on __time column will fail|false| |`druid.sql.planner.sqlTimeZone`|Sets the default time zone for the server, which will affect how time functions and timestamp literals behave. Should be a time zone name like "America/Los_Angeles" or offset like "-08:00".|UTC| |`druid.sql.planner.metadataSegmentCacheEnable`|Whether to keep a cache of published segments in broker. If true, broker polls coordinator in background to get segments from metadata store and maintains a local cache. If false, coordinator's REST api will be invoked when broker needs published segments info.|false| +|`druid.sql.planner.metadataSegmentPollPeriod`|How often to poll coordinator for published segments list if `druid.sql.planner.metadataSegmentCacheEnable` is set to true. Poll period is in milliseconds. |60000| ## SQL Metrics diff --git a/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java b/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java index e2e9608874c9..11d104d8604f 100644 --- a/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java +++ b/server/src/main/java/org/apache/druid/client/DataSegmentInterner.java @@ -36,7 +36,7 @@ public class DataSegmentInterner private DataSegmentInterner() { - + //No instantiation } public static DataSegment intern(DataSegment segment) @@ -46,5 +46,4 @@ public static DataSegment intern(DataSegment segment) // HISTORICAL interner. return segment.getSize() > 0 ? HISTORICAL_INTERNER.intern(segment) : REALTIME_INTERNER.intern(segment); } - } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java index 8673538e1c07..d504f5ca3b76 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/planner/PlannerConfig.java @@ -69,6 +69,14 @@ public class PlannerConfig @JsonProperty private boolean metadataSegmentCacheEnable = false; + @JsonProperty + private long metadataSegmentPollPeriod = 60000; + + public long getMetadataSegmentPollPeriod() + { + return metadataSegmentPollPeriod; + } + public boolean isMetadataSegmentCacheEnable() { return metadataSegmentCacheEnable; @@ -160,6 +168,7 @@ public PlannerConfig withOverrides(final Map context) newConfig.sqlTimeZone = getSqlTimeZone(); newConfig.awaitInitializationOnStart = isAwaitInitializationOnStart(); newConfig.metadataSegmentCacheEnable = isMetadataSegmentCacheEnable(); + newConfig.metadataSegmentPollPeriod = getMetadataSegmentPollPeriod(); return newConfig; } @@ -200,6 +209,8 @@ public boolean equals(final Object o) useFallback == that.useFallback && requireTimeCondition == that.requireTimeCondition && awaitInitializationOnStart == that.awaitInitializationOnStart && + metadataSegmentCacheEnable == that.metadataSegmentCacheEnable && + metadataSegmentPollPeriod == that.metadataSegmentPollPeriod && Objects.equals(metadataRefreshPeriod, that.metadataRefreshPeriod) && Objects.equals(sqlTimeZone, that.sqlTimeZone); } @@ -219,7 +230,9 @@ public int hashCode() useFallback, requireTimeCondition, awaitInitializationOnStart, - sqlTimeZone + sqlTimeZone, + metadataSegmentCacheEnable, + metadataSegmentPollPeriod ); } @@ -237,6 +250,8 @@ public String toString() ", useFallback=" + useFallback + ", requireTimeCondition=" + requireTimeCondition + ", awaitInitializationOnStart=" + awaitInitializationOnStart + + ", metadataSegmentCacheEnable=" + metadataSegmentCacheEnable + + ", metadataSegmentPollPeriod=" + metadataSegmentPollPeriod + ", sqlTimeZone=" + sqlTimeZone + '}'; } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 738c8c7bba65..a5a32f128ae0 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -29,9 +29,11 @@ import org.apache.druid.client.DataSegmentInterner; import org.apache.druid.client.JsonParserIterator; import org.apache.druid.client.coordinator.Coordinator; +import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DruidLeaderClient; import org.apache.druid.guice.ManageLifecycle; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; @@ -61,7 +63,6 @@ public class MetadataSegmentView { - private static final long POLL_PERIOD_IN_MS = 60000; private static final EmittingLogger log = new EmittingLogger(MetadataSegmentView.class); private final DruidLeaderClient coordinatorDruidLeaderClient; @@ -69,9 +70,11 @@ public class MetadataSegmentView private final BytesAccumulatingResponseHandler responseHandler; private final BrokerSegmentWatcherConfig segmentWatcherConfig; - private final ConcurrentMap publishedSegments = new ConcurrentHashMap<>(1000); + private final boolean isCacheEnabled; + private final ConcurrentMap publishedSegments; private ScheduledExecutorService scheduledExec; - final PlannerConfig plannerConfig; + private final long pollPeriodinMS; + private LifecycleLock lifecycleLock = new LifecycleLock(); @Inject public MetadataSegmentView( @@ -82,27 +85,46 @@ public MetadataSegmentView( final PlannerConfig plannerConfig ) { - this.plannerConfig = Preconditions.checkNotNull(plannerConfig, "plannerConfig"); + Preconditions.checkNotNull(plannerConfig, "plannerConfig"); this.coordinatorDruidLeaderClient = druidLeaderClient; this.jsonMapper = jsonMapper; this.responseHandler = responseHandler; this.segmentWatcherConfig = segmentWatcherConfig; + this.isCacheEnabled = plannerConfig.isMetadataSegmentCacheEnable(); + this.pollPeriodinMS = plannerConfig.getMetadataSegmentPollPeriod(); + this.publishedSegments = isCacheEnabled ? new ConcurrentHashMap<>(1000) : new ConcurrentHashMap<>(); } @LifecycleStart public void start() { - if (plannerConfig.isMetadataSegmentCacheEnable()) { - scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); - scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS); + if (!lifecycleLock.canStart()) { + throw new ISE("can't start."); + } + try { + if (isCacheEnabled) { + scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); + scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS); + lifecycleLock.started(); + log.info("MetadataSegmentView Started."); + } + } + finally { + lifecycleLock.exitStart(); } } @LifecycleStop public void stop() { - scheduledExec.shutdownNow(); - scheduledExec = null; + if (!lifecycleLock.canStop()) { + throw new ISE("can't stop."); + } + if (isCacheEnabled) { + scheduledExec.shutdownNow(); + scheduledExec = null; + } + log.info("MetadataSegmentView Stopped."); } private void poll() @@ -136,7 +158,7 @@ private void poll() public Iterator getPublishedSegments() { - if (plannerConfig.isMetadataSegmentCacheEnable()) { + if (isCacheEnabled) { return publishedSegments.keySet().iterator(); } else { return getMetadataSegments( @@ -202,14 +224,14 @@ private class PollTask implements Runnable @Override public void run() { - long delayMS = POLL_PERIOD_IN_MS; + long delayMS = pollPeriodinMS; try { final long pollStartTime = System.nanoTime(); poll(); final long pollEndTime = System.nanoTime(); final long pollTimeNS = pollEndTime - pollStartTime; final long pollTimeMS = TimeUnit.NANOSECONDS.toMillis(pollTimeNS); - delayMS = Math.max(POLL_PERIOD_IN_MS - pollTimeMS, 0); + delayMS = Math.max(pollPeriodinMS - pollTimeMS, 0); } catch (Exception e) { log.makeAlert(e, "Problem polling Coordinator.").emit(); From 981b08081f41d239887db63c169332b4fe848ff5 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 1 Feb 2019 10:20:44 -0800 Subject: [PATCH 17/26] some more changes --- .../sql/calcite/schema/MetadataSegmentView.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index a5a32f128ae0..6aa61b9d012e 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -72,7 +72,7 @@ public class MetadataSegmentView private final boolean isCacheEnabled; private final ConcurrentMap publishedSegments; - private ScheduledExecutorService scheduledExec; + private final ScheduledExecutorService scheduledExec; private final long pollPeriodinMS; private LifecycleLock lifecycleLock = new LifecycleLock(); @@ -92,7 +92,8 @@ public MetadataSegmentView( this.segmentWatcherConfig = segmentWatcherConfig; this.isCacheEnabled = plannerConfig.isMetadataSegmentCacheEnable(); this.pollPeriodinMS = plannerConfig.getMetadataSegmentPollPeriod(); - this.publishedSegments = isCacheEnabled ? new ConcurrentHashMap<>(1000) : new ConcurrentHashMap<>(); + this.publishedSegments = isCacheEnabled ? new ConcurrentHashMap<>(1000) : null; + this.scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); } @LifecycleStart @@ -103,7 +104,6 @@ public void start() } try { if (isCacheEnabled) { - scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS); lifecycleLock.started(); log.info("MetadataSegmentView Started."); @@ -121,10 +121,10 @@ public void stop() throw new ISE("can't stop."); } if (isCacheEnabled) { - scheduledExec.shutdownNow(); - scheduledExec = null; + log.info("MetadataSegmentView is stopping."); + scheduledExec.shutdown(); + log.info("MetadataSegmentView Stopped."); } - log.info("MetadataSegmentView Stopped."); } private void poll() From 3a94cae8a45f4f6cee927650ef8a595d9506b792 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 1 Feb 2019 14:02:08 -0800 Subject: [PATCH 18/26] PR comments --- .../calcite/schema/MetadataSegmentView.java | 47 ++++++++++--------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 6aa61b9d012e..406a5e4c0421 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -99,31 +99,35 @@ public MetadataSegmentView( @LifecycleStart public void start() { - if (!lifecycleLock.canStart()) { - throw new ISE("can't start."); - } - try { - if (isCacheEnabled) { - scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS); - lifecycleLock.started(); - log.info("MetadataSegmentView Started."); + synchronized (lifecycleLock) { + if (!lifecycleLock.canStart()) { + throw new ISE("can't start."); + } + try { + if (isCacheEnabled) { + scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS); + lifecycleLock.started(); + log.info("MetadataSegmentView Started."); + } + } + finally { + lifecycleLock.exitStart(); } - } - finally { - lifecycleLock.exitStart(); } } @LifecycleStop public void stop() { - if (!lifecycleLock.canStop()) { - throw new ISE("can't stop."); - } - if (isCacheEnabled) { - log.info("MetadataSegmentView is stopping."); - scheduledExec.shutdown(); - log.info("MetadataSegmentView Stopped."); + synchronized (lifecycleLock) { + if (!lifecycleLock.canStop()) { + throw new ISE("can't stop."); + } + if (isCacheEnabled) { + log.info("MetadataSegmentView is stopping."); + scheduledExec.shutdown(); + log.info("MetadataSegmentView Stopped."); + } } } @@ -224,19 +228,20 @@ private class PollTask implements Runnable @Override public void run() { - long delayMS = pollPeriodinMS; try { final long pollStartTime = System.nanoTime(); poll(); final long pollEndTime = System.nanoTime(); final long pollTimeNS = pollEndTime - pollStartTime; final long pollTimeMS = TimeUnit.NANOSECONDS.toMillis(pollTimeNS); - delayMS = Math.max(pollPeriodinMS - pollTimeMS, 0); + final long delayMS = Math.max(pollPeriodinMS - pollTimeMS, 0); + if (!Thread.currentThread().isInterrupted()) { + scheduledExec.schedule(new PollTask(), delayMS, TimeUnit.MILLISECONDS); + } } catch (Exception e) { log.makeAlert(e, "Problem polling Coordinator.").emit(); } - scheduledExec.schedule(new PollTask(), delayMS, TimeUnit.MILLISECONDS); } } From ad284581b56f2029647787bbb9b55d0a2ea8d557 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 1 Feb 2019 14:47:22 -0800 Subject: [PATCH 19/26] fix test --- .../aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java index 4641e2748997..6b218bbcb44c 100644 --- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java +++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/aggregation/bloom/sql/BloomFilterSqlAggregatorTest.java @@ -192,7 +192,7 @@ public void setUp() throws Exception final PlannerConfig plannerConfig = new PlannerConfig(); final DruidSchema druidSchema = CalciteTests.createMockSchema(conglomerate, walker, plannerConfig); - final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker); + final SystemSchema systemSchema = CalciteTests.createMockSystemSchema(druidSchema, walker, plannerConfig); final DruidOperatorTable operatorTable = new DruidOperatorTable( ImmutableSet.of(new BloomFilterSqlAggregator()), ImmutableSet.of() From 2ddb7a15ca1a41dda356c7c1ef9606be7ae2c83f Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 1 Feb 2019 15:23:55 -0800 Subject: [PATCH 20/26] remove unintentional change, whether to synchronize on lifecycleLock is still in discussion in PR --- .../calcite/schema/MetadataSegmentView.java | 40 +++++++++---------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 406a5e4c0421..fc5f3bae5d78 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -99,35 +99,31 @@ public MetadataSegmentView( @LifecycleStart public void start() { - synchronized (lifecycleLock) { - if (!lifecycleLock.canStart()) { - throw new ISE("can't start."); - } - try { - if (isCacheEnabled) { - scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS); - lifecycleLock.started(); - log.info("MetadataSegmentView Started."); - } - } - finally { - lifecycleLock.exitStart(); + if (!lifecycleLock.canStart()) { + throw new ISE("can't start."); + } + try { + if (isCacheEnabled) { + scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS); + lifecycleLock.started(); + log.info("MetadataSegmentView Started."); } } + finally { + lifecycleLock.exitStart(); + } } @LifecycleStop public void stop() { - synchronized (lifecycleLock) { - if (!lifecycleLock.canStop()) { - throw new ISE("can't stop."); - } - if (isCacheEnabled) { - log.info("MetadataSegmentView is stopping."); - scheduledExec.shutdown(); - log.info("MetadataSegmentView Stopped."); - } + if (!lifecycleLock.canStop()) { + throw new ISE("can't stop."); + } + if (isCacheEnabled) { + log.info("MetadataSegmentView is stopping."); + scheduledExec.shutdown(); + log.info("MetadataSegmentView Stopped."); } } From 1b3749306f133749c0c0a154a193d94732de84cd Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 1 Feb 2019 15:31:10 -0800 Subject: [PATCH 21/26] minor changes --- .../apache/druid/sql/calcite/schema/MetadataSegmentView.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index fc5f3bae5d78..ea3265fe865b 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -46,6 +46,7 @@ import org.jboss.netty.handler.codec.http.HttpMethod; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.util.Iterator; @@ -71,10 +72,11 @@ public class MetadataSegmentView private final BrokerSegmentWatcherConfig segmentWatcherConfig; private final boolean isCacheEnabled; + @Nullable private final ConcurrentMap publishedSegments; private final ScheduledExecutorService scheduledExec; private final long pollPeriodinMS; - private LifecycleLock lifecycleLock = new LifecycleLock(); + private final LifecycleLock lifecycleLock = new LifecycleLock(); @Inject public MetadataSegmentView( From 60dbf41bd887f178c4a85da83d6abe4447061b7b Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 1 Feb 2019 16:42:23 -0800 Subject: [PATCH 22/26] some changes to initialization --- .../calcite/schema/MetadataSegmentView.java | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index ea3265fe865b..9e6555c07b9e 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -106,10 +106,15 @@ public void start() } try { if (isCacheEnabled) { - scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS); - lifecycleLock.started(); - log.info("MetadataSegmentView Started."); + try { + poll(); + } + finally { + scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS); + } } + lifecycleLock.started(); + log.info("MetadataSegmentView Started."); } finally { lifecycleLock.exitStart(); @@ -161,6 +166,10 @@ private void poll() public Iterator getPublishedSegments() { if (isCacheEnabled) { + Preconditions.checkState( + lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS), + "hold on, still syncing published segments" + ); return publishedSegments.keySet().iterator(); } else { return getMetadataSegments( @@ -173,7 +182,7 @@ public Iterator getPublishedSegments() } // Note that coordinator must be up to get segments - private static JsonParserIterator getMetadataSegments( + private JsonParserIterator getMetadataSegments( DruidLeaderClient coordinatorClient, ObjectMapper jsonMapper, BytesAccumulatingResponseHandler responseHandler, @@ -226,20 +235,23 @@ private class PollTask implements Runnable @Override public void run() { + long delayMS = pollPeriodinMS; try { final long pollStartTime = System.nanoTime(); poll(); final long pollEndTime = System.nanoTime(); final long pollTimeNS = pollEndTime - pollStartTime; final long pollTimeMS = TimeUnit.NANOSECONDS.toMillis(pollTimeNS); - final long delayMS = Math.max(pollPeriodinMS - pollTimeMS, 0); - if (!Thread.currentThread().isInterrupted()) { - scheduledExec.schedule(new PollTask(), delayMS, TimeUnit.MILLISECONDS); - } + delayMS = Math.max(pollPeriodinMS - pollTimeMS, 0); } catch (Exception e) { log.makeAlert(e, "Problem polling Coordinator.").emit(); } + finally { + if (!Thread.currentThread().isInterrupted()) { + scheduledExec.schedule(new PollTask(), delayMS, TimeUnit.MILLISECONDS); + } + } } } From 8860053f7f0271d9b687e246d002d1ec64cfc0a2 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 1 Feb 2019 17:00:02 -0800 Subject: [PATCH 23/26] use pollPeriodInMS --- .../sql/calcite/schema/MetadataSegmentView.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 9e6555c07b9e..918bf2b35cee 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -75,7 +75,7 @@ public class MetadataSegmentView @Nullable private final ConcurrentMap publishedSegments; private final ScheduledExecutorService scheduledExec; - private final long pollPeriodinMS; + private final long pollPeriodInMS; private final LifecycleLock lifecycleLock = new LifecycleLock(); @Inject @@ -93,7 +93,7 @@ public MetadataSegmentView( this.responseHandler = responseHandler; this.segmentWatcherConfig = segmentWatcherConfig; this.isCacheEnabled = plannerConfig.isMetadataSegmentCacheEnable(); - this.pollPeriodinMS = plannerConfig.getMetadataSegmentPollPeriod(); + this.pollPeriodInMS = plannerConfig.getMetadataSegmentPollPeriod(); this.publishedSegments = isCacheEnabled ? new ConcurrentHashMap<>(1000) : null; this.scheduledExec = Execs.scheduledSingleThreaded("MetadataSegmentView-Cache--%d"); } @@ -109,8 +109,11 @@ public void start() try { poll(); } + catch (Exception e) { + log.makeAlert(e, "Problem polling Coordinator.").emit(); + } finally { - scheduledExec.schedule(new PollTask(), 0, TimeUnit.MILLISECONDS); + scheduledExec.schedule(new PollTask(), pollPeriodInMS, TimeUnit.MILLISECONDS); } } lifecycleLock.started(); @@ -235,14 +238,14 @@ private class PollTask implements Runnable @Override public void run() { - long delayMS = pollPeriodinMS; + long delayMS = pollPeriodInMS; try { final long pollStartTime = System.nanoTime(); poll(); final long pollEndTime = System.nanoTime(); final long pollTimeNS = pollEndTime - pollStartTime; final long pollTimeMS = TimeUnit.NANOSECONDS.toMillis(pollTimeNS); - delayMS = Math.max(pollPeriodinMS - pollTimeMS, 0); + delayMS = Math.max(pollPeriodInMS - pollTimeMS, 0); } catch (Exception e) { log.makeAlert(e, "Problem polling Coordinator.").emit(); From ca21779db5ce2ee95a925e5e376674b27df123cd Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 1 Feb 2019 17:58:40 -0800 Subject: [PATCH 24/26] Add boolean cachePopulated to check if first poll succeeds --- .../druid/sql/calcite/schema/MetadataSegmentView.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 918bf2b35cee..f619bc201d15 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -55,6 +55,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** * This class polls the coordinator in background to keep the latest published segments. @@ -77,6 +78,7 @@ public class MetadataSegmentView private final ScheduledExecutorService scheduledExec; private final long pollPeriodInMS; private final LifecycleLock lifecycleLock = new LifecycleLock(); + private final AtomicBoolean cachePopulated = new AtomicBoolean(false); @Inject public MetadataSegmentView( @@ -163,14 +165,14 @@ private void poll() // This means publishedSegments will be eventually consistent with // the segments in coordinator publishedSegments.entrySet().removeIf(e -> e.getValue() != timestamp); - + cachePopulated.set(true); } public Iterator getPublishedSegments() { if (isCacheEnabled) { Preconditions.checkState( - lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS), + lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS) && cachePopulated.get(), "hold on, still syncing published segments" ); return publishedSegments.keySet().iterator(); From 3a70f390d6b5942952e29d43fbc27b12440659c5 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 1 Feb 2019 18:13:04 -0800 Subject: [PATCH 25/26] Remove poll from start() --- .../druid/sql/calcite/schema/MetadataSegmentView.java | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index f619bc201d15..8e4cec4ef4c0 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -108,15 +108,7 @@ public void start() } try { if (isCacheEnabled) { - try { - poll(); - } - catch (Exception e) { - log.makeAlert(e, "Problem polling Coordinator.").emit(); - } - finally { - scheduledExec.schedule(new PollTask(), pollPeriodInMS, TimeUnit.MILLISECONDS); - } + scheduledExec.schedule(new PollTask(), pollPeriodInMS, TimeUnit.MILLISECONDS); } lifecycleLock.started(); log.info("MetadataSegmentView Started."); From e2a9af746e911ebeb18276cfd111dd4d0d1e9f83 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Fri, 1 Feb 2019 19:57:08 -0800 Subject: [PATCH 26/26] take the log message out of condition in stop() --- .../apache/druid/sql/calcite/schema/MetadataSegmentView.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java index 8e4cec4ef4c0..50fe3133cd28 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/MetadataSegmentView.java @@ -124,11 +124,11 @@ public void stop() if (!lifecycleLock.canStop()) { throw new ISE("can't stop."); } + log.info("MetadataSegmentView is stopping."); if (isCacheEnabled) { - log.info("MetadataSegmentView is stopping."); scheduledExec.shutdown(); - log.info("MetadataSegmentView Stopped."); } + log.info("MetadataSegmentView Stopped."); } private void poll()