diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 0d199a83e06e..54f395e54827 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -286,6 +286,8 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception final FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox, fireDepartmentMetrics); final KafkaConsumer consumer = newConsumer() ) { + toolbox.getDataSegmentServerAnnouncer().announce(); + appenderator = appenderator0; final String topic = ioConfig.getStartPartitions().getTopic(); @@ -567,6 +569,8 @@ public String apply(DataSegment input) } } + toolbox.getDataSegmentServerAnnouncer().unannounce(); + return success(); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index f6acd7dc98e5..52f751757fd1 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -112,6 +112,7 @@ import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; +import io.druid.server.coordination.DataSegmentServerAnnouncer; import io.druid.timeline.DataSegment; import org.apache.curator.test.TestingCluster; import org.apache.kafka.clients.producer.KafkaProducer; @@ -1522,6 +1523,7 @@ public void close() null, // DataSegmentMover null, // DataSegmentArchiver new TestDataSegmentAnnouncer(), + EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), handoffNotifierFactory, makeTimeseriesOnlyConglomerate(), MoreExecutors.sameThreadExecutor(), // queryExecutorService diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java index d9a690eb65ba..7f79a9c4d741 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolbox.java @@ -46,6 +46,7 @@ import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DataSegmentAnnouncer; +import io.druid.server.coordination.DataSegmentServerAnnouncer; import io.druid.timeline.DataSegment; import org.joda.time.Interval; @@ -70,6 +71,7 @@ public class TaskToolbox private final DataSegmentArchiver dataSegmentArchiver; private final DataSegmentMover dataSegmentMover; private final DataSegmentAnnouncer segmentAnnouncer; + private final DataSegmentServerAnnouncer serverAnnouncer; private final SegmentHandoffNotifierFactory handoffNotifierFactory; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final MonitorScheduler monitorScheduler; @@ -93,6 +95,7 @@ public TaskToolbox( DataSegmentMover dataSegmentMover, DataSegmentArchiver dataSegmentArchiver, DataSegmentAnnouncer segmentAnnouncer, + DataSegmentServerAnnouncer serverAnnouncer, SegmentHandoffNotifierFactory handoffNotifierFactory, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, ExecutorService queryExecutorService, @@ -116,6 +119,7 @@ public TaskToolbox( this.dataSegmentMover = dataSegmentMover; this.dataSegmentArchiver = dataSegmentArchiver; this.segmentAnnouncer = segmentAnnouncer; + this.serverAnnouncer = serverAnnouncer; this.handoffNotifierFactory = handoffNotifierFactory; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; this.queryExecutorService = queryExecutorService; @@ -170,6 +174,11 @@ public DataSegmentAnnouncer getSegmentAnnouncer() return segmentAnnouncer; } + public DataSegmentServerAnnouncer getDataSegmentServerAnnouncer() + { + return serverAnnouncer; + } + public SegmentHandoffNotifierFactory getSegmentHandoffNotifierFactory() { return handoffNotifierFactory; diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java index 291fbde00b84..7527d1b8761b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskToolboxFactory.java @@ -40,6 +40,7 @@ import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DataSegmentAnnouncer; +import io.druid.server.coordination.DataSegmentServerAnnouncer; import java.io.File; import java.util.concurrent.ExecutorService; @@ -57,6 +58,7 @@ public class TaskToolboxFactory private final DataSegmentMover dataSegmentMover; private final DataSegmentArchiver dataSegmentArchiver; private final DataSegmentAnnouncer segmentAnnouncer; + private final DataSegmentServerAnnouncer serverAnnouncer; private final SegmentHandoffNotifierFactory handoffNotifierFactory; private final QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate; private final ExecutorService queryExecutorService; @@ -79,6 +81,7 @@ public TaskToolboxFactory( DataSegmentMover dataSegmentMover, DataSegmentArchiver dataSegmentArchiver, DataSegmentAnnouncer segmentAnnouncer, + DataSegmentServerAnnouncer serverAnnouncer, SegmentHandoffNotifierFactory handoffNotifierFactory, QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate, @Processing ExecutorService queryExecutorService, @@ -100,6 +103,7 @@ public TaskToolboxFactory( this.dataSegmentMover = dataSegmentMover; this.dataSegmentArchiver = dataSegmentArchiver; this.segmentAnnouncer = segmentAnnouncer; + this.serverAnnouncer = serverAnnouncer; this.handoffNotifierFactory = handoffNotifierFactory; this.queryRunnerFactoryConglomerate = queryRunnerFactoryConglomerate; this.queryExecutorService = queryExecutorService; @@ -126,6 +130,7 @@ public TaskToolbox build(Task task) dataSegmentMover, dataSegmentArchiver, segmentAnnouncer, + serverAnnouncer, handoffNotifierFactory, queryRunnerFactoryConglomerate, queryExecutorService, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 60bf62a9d3af..b9e43a5b1ead 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -248,12 +248,6 @@ public void unannounceSegments(Iterable segments) throws IOExceptio } } } - - @Override - public boolean isAnnounced(DataSegment segment) - { - return toolbox.getSegmentAnnouncer().isAnnounced(segment); - } }; // NOTE: getVersion will block if there is lock contention, which will block plumber.getSink @@ -326,6 +320,8 @@ public String getVersion(final Interval interval) Supplier committerSupplier = null; try { + toolbox.getDataSegmentServerAnnouncer().announce(); + plumber.startJob(); // Set up metrics emission @@ -425,6 +421,8 @@ public void run() toolbox.getMonitorScheduler().removeMonitor(metricsMonitor); } } + + toolbox.getDataSegmentServerAnnouncer().unannounce(); } log.info("Job done!"); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java index 86a513816ab2..76e23a4f393f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TaskToolboxTest.java @@ -41,6 +41,7 @@ import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.coordination.DataSegmentAnnouncer; +import io.druid.server.coordination.DataSegmentServerAnnouncer; import io.druid.timeline.DataSegment; import org.easymock.EasyMock; import org.joda.time.Interval; @@ -102,6 +103,7 @@ public void setUp() throws IOException mockDataSegmentMover, mockDataSegmentArchiver, mockSegmentAnnouncer, + EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), mockHandoffNotifierFactory, mockQueryRunnerFactoryConglomerate, mockQueryExecutorService, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 31c05bc47ea4..4dcd8d266b3e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -550,7 +550,7 @@ public DataSegment push(File file, DataSegment segment) throws IOException segments.add(segment); return segment; } - }, null, null, null, null, null, null, null, null, null, jsonMapper, temporaryFolder.newFolder(), + }, null, null, null, null, null, null, null, null, null, null, jsonMapper, temporaryFolder.newFolder(), indexMerger, indexIO, null, null, indexMergerV9 ) ); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 7f4464f7d540..c204f55edf7e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -101,6 +101,7 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory; +import io.druid.server.coordination.DataSegmentServerAnnouncer; import io.druid.timeline.DataSegment; import org.easymock.EasyMock; import org.hamcrest.CoreMatchers; @@ -1042,6 +1043,7 @@ Map> getHandOffCallbacks() null, // DataSegmentMover null, // DataSegmentArchiver new TestDataSegmentAnnouncer(), + EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), handoffNotifierFactory, conglomerate, MoreExecutors.sameThreadExecutor(), // queryExecutorService diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java index 09d1973c2245..5bdb182c4b12 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/SameIntervalMergeTaskTest.java @@ -200,7 +200,7 @@ public DataSegment push(File file, DataSegment segment) throws IOException segments.add(segment); return segment; } - }, null, null, null, null, null, null, null, null, new SegmentLoader() + }, null, null, null, null, null, null, null, null, null, new SegmentLoader() { @Override public boolean isSegmentLoaded(DataSegment segment) throws SegmentLoadingException diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index b7431f6f3abd..aa42b03e10f5 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -270,6 +270,7 @@ public DataSegment restore(DataSegment segment) throws SegmentLoadingException } }, null, // segment announcer + null, notifierFactory, null, // query runner factory conglomerate corporation unionized collective null, // query executor service diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java index 165b29fc7fee..65fbbb613f3c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTimelineTest.java @@ -318,6 +318,7 @@ public TaskActionClient create(Task task) null, // segment mover null, // segment archiver null, // segment announcer, + null, notifierFactory, null, // query runner factory conglomerate corporation unionized collective null, // query executor service diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index a4373d5be9a2..4b044026146f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -102,6 +102,7 @@ import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.DruidNode; import io.druid.server.coordination.DataSegmentAnnouncer; +import io.druid.server.coordination.DataSegmentServerAnnouncer; import io.druid.server.metrics.NoopServiceEmitter; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; @@ -566,13 +567,8 @@ public void unannounceSegments(Iterable segments) throws IOExceptio { } - - @Override - public boolean isAnnounced(DataSegment segment) - { - return false; - } }, // segment announcer + EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), handoffNotifierFactory, queryRunnerFactoryConglomerate, // query runner factory conglomerate corporation unionized collective MoreExecutors.sameThreadExecutor(), // query executor service diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentAnnouncer.java b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentAnnouncer.java index 2e36d2a18184..be5ef6df9ea2 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentAnnouncer.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestDataSegmentAnnouncer.java @@ -59,12 +59,6 @@ public void unannounceSegments(Iterable segments) throws IOExceptio } } - @Override - public boolean isAnnounced(DataSegment segment) - { - return announcedSegments.contains(segment); - } - public Set getAnnouncedSegments() { return ImmutableSet.copyOf(announcedSegments); diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java b/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java index e79201cc62bb..3a5af4805634 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestServerView.java @@ -131,4 +131,16 @@ public Iterable getInventory() { return null; } + + @Override + public boolean isStarted() + { + return true; + } + + @Override + public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment) + { + return false; + } } diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java index 3ac1cad36e32..05e48011b84b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/WorkerTaskMonitorTest.java @@ -169,7 +169,7 @@ private WorkerTaskMonitor createTaskMonitor() new TaskToolboxFactory( taskConfig, taskActionClientFactory, - null, null, null, null, null, null, notifierFactory, null, null, null, new SegmentLoaderFactory( + null, null, null, null, null, null, null, notifierFactory, null, null, null, new SegmentLoaderFactory( new SegmentLoaderLocalCacheManager( null, new SegmentLoaderConfig() diff --git a/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java b/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java new file mode 100644 index 000000000000..996bf559c5e1 --- /dev/null +++ b/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java @@ -0,0 +1,365 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Charsets; +import com.google.common.base.Function; +import com.google.common.base.Throwables; +import com.google.common.collect.MapMaker; +import com.metamx.emitter.EmittingLogger; +import io.druid.concurrent.Execs; +import io.druid.curator.inventory.CuratorInventoryManager; +import io.druid.curator.inventory.CuratorInventoryManagerStrategy; +import io.druid.curator.inventory.InventoryManagerConfig; +import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.lifecycle.LifecycleStart; +import io.druid.java.util.common.lifecycle.LifecycleStop; +import io.druid.timeline.DataSegment; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.utils.ZKPaths; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + */ +public abstract class AbstractCuratorServerInventoryView implements ServerInventoryView +{ + + private final EmittingLogger log; + private final CuratorFramework curator; + private final CuratorInventoryManager inventoryManager; + private final AtomicBoolean started = new AtomicBoolean(false); + + private final ConcurrentMap serverCallbacks = new MapMaker().makeMap(); + private final ConcurrentMap segmentCallbacks = new MapMaker().makeMap(); + + public AbstractCuratorServerInventoryView( + final EmittingLogger log, + final String announcementsPath, + final String inventoryPath, + final CuratorFramework curator, + final ObjectMapper jsonMapper, + final TypeReference typeReference + ) + { + this.log = log; + this.curator = curator; + this.inventoryManager = new CuratorInventoryManager<>( + curator, + new InventoryManagerConfig() + { + @Override + public String getContainerPath() + { + return announcementsPath; + } + + @Override + public String getInventoryPath() + { + return inventoryPath; + } + }, + Execs.singleThreaded("ServerInventoryView-%s"), + new CuratorInventoryManagerStrategy() + { + @Override + public DruidServer deserializeContainer(byte[] bytes) + { + try { + return jsonMapper.readValue(bytes, DruidServer.class); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @Override + public InventoryType deserializeInventory(byte[] bytes) + { + try { + return jsonMapper.readValue(bytes, typeReference); + } + catch (IOException e) { + CharBuffer.wrap(StringUtils.fromUtf8(bytes).toCharArray()); + CharBuffer charBuffer = Charsets.UTF_8.decode(ByteBuffer.wrap(bytes)); + log.error(e, "Could not parse json: %s", charBuffer.toString()); + throw Throwables.propagate(e); + } + } + + @Override + public void newContainer(DruidServer container) + { + log.info("New Server[%s]", container); + } + + @Override + public void deadContainer(DruidServer deadContainer) + { + log.info("Server Disappeared[%s]", deadContainer); + runServerCallbacks(deadContainer); + } + + @Override + public DruidServer updateContainer(DruidServer oldContainer, DruidServer newContainer) + { + return newContainer.addDataSegments(oldContainer); + } + + @Override + public DruidServer addInventory( + final DruidServer container, + String inventoryKey, + final InventoryType inventory + ) + { + return addInnerInventory(container, inventoryKey, inventory); + } + + @Override + public DruidServer updateInventory( + DruidServer container, String inventoryKey, InventoryType inventory + ) + { + return updateInnerInventory(container, inventoryKey, inventory); + } + + @Override + public DruidServer removeInventory(final DruidServer container, String inventoryKey) + { + return removeInnerInventory(container, inventoryKey); + } + + @Override + public void inventoryInitialized() + { + log.info("Inventory Initialized"); + runSegmentCallbacks( + new Function() + { + @Override + public CallbackAction apply(SegmentCallback input) + { + return input.segmentViewInitialized(); + } + } + ); + } + } + ); + } + + @LifecycleStart + public void start() throws Exception + { + synchronized (started) { + if (!started.get()) { + inventoryManager.start(); + started.set(true); + } + } + } + + @LifecycleStop + public void stop() throws IOException + { + synchronized (started) { + if (started.getAndSet(false)) { + inventoryManager.stop(); + } + } + } + + @Override + public boolean isStarted() + { + return started.get(); + } + + @Override + public DruidServer getInventoryValue(String containerKey) + { + return inventoryManager.getInventoryValue(containerKey); + } + + @Override + public Iterable getInventory() + { + return inventoryManager.getInventory(); + } + + @Override + public void registerServerCallback(Executor exec, ServerCallback callback) + { + serverCallbacks.put(callback, exec); + } + + @Override + public void registerSegmentCallback(Executor exec, SegmentCallback callback) + { + segmentCallbacks.put(callback, exec); + } + + public InventoryManagerConfig getInventoryManagerConfig() + { + return inventoryManager.getConfig(); + } + + protected void runSegmentCallbacks( + final Function fn + ) + { + for (final Map.Entry entry : segmentCallbacks.entrySet()) { + entry.getValue().execute( + new Runnable() + { + @Override + public void run() + { + if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) { + segmentCallbackRemoved(entry.getKey()); + segmentCallbacks.remove(entry.getKey()); + } + } + } + ); + } + } + + protected void runServerCallbacks(final DruidServer server) + { + for (final Map.Entry entry : serverCallbacks.entrySet()) { + entry.getValue().execute( + new Runnable() + { + @Override + public void run() + { + if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) { + serverCallbacks.remove(entry.getKey()); + } + } + } + ); + } + } + + protected void addSingleInventory( + final DruidServer container, + final DataSegment inventory + ) + { + log.debug("Server[%s] added segment[%s]", container.getName(), inventory.getIdentifier()); + + if (container.getSegment(inventory.getIdentifier()) != null) { + log.warn( + "Not adding or running callbacks for existing segment[%s] on server[%s]", + inventory.getIdentifier(), + container.getName() + ); + + return; + } + + container.addDataSegment(inventory.getIdentifier(), inventory); + + runSegmentCallbacks( + new Function() + { + @Override + public CallbackAction apply(SegmentCallback input) + { + return input.segmentAdded(container.getMetadata(), inventory); + } + } + ); + } + + protected void removeSingleInventory(final DruidServer container, String inventoryKey) + { + log.debug("Server[%s] removed segment[%s]", container.getName(), inventoryKey); + final DataSegment segment = container.getSegment(inventoryKey); + + if (segment == null) { + log.warn( + "Not running cleanup or callbacks for non-existing segment[%s] on server[%s]", + inventoryKey, + container.getName() + ); + + return; + } + + container.removeDataSegment(inventoryKey); + + runSegmentCallbacks( + new Function() + { + @Override + public CallbackAction apply(SegmentCallback input) + { + return input.segmentRemoved(container.getMetadata(), segment); + } + } + ); + } + + @Override + public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment) + { + try { + String toServedSegPath = ZKPaths.makePath( + ZKPaths.makePath(getInventoryManagerConfig().getInventoryPath(), serverKey), + segment.getIdentifier() + ); + return curator.checkExists().forPath(toServedSegPath) != null; + } catch (Exception ex) { + throw Throwables.propagate(ex); + } + } + + protected abstract DruidServer addInnerInventory( + final DruidServer container, + String inventoryKey, + final InventoryType inventory + ); + + protected abstract DruidServer updateInnerInventory( + final DruidServer container, + String inventoryKey, + final InventoryType inventory + ); + + protected abstract DruidServer removeInnerInventory( + final DruidServer container, + String inventoryKey + ); + + protected abstract void segmentCallbackRemoved(SegmentCallback callback); +} diff --git a/server/src/main/java/io/druid/client/BatchServerInventoryView.java b/server/src/main/java/io/druid/client/BatchServerInventoryView.java index f09579eb6fcb..9fd9f5888334 100644 --- a/server/src/main/java/io/druid/client/BatchServerInventoryView.java +++ b/server/src/main/java/io/druid/client/BatchServerInventoryView.java @@ -45,7 +45,7 @@ /** */ @ManageLifecycle -public class BatchServerInventoryView extends ServerInventoryView> +public class BatchServerInventoryView extends AbstractCuratorServerInventoryView> implements FilteredServerInventoryView { private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class); diff --git a/server/src/main/java/io/druid/client/CoordinatorServerView.java b/server/src/main/java/io/druid/client/CoordinatorServerView.java index ee32b07c1db4..a572deecd14a 100644 --- a/server/src/main/java/io/druid/client/CoordinatorServerView.java +++ b/server/src/main/java/io/druid/client/CoordinatorServerView.java @@ -207,4 +207,16 @@ public Iterable getInventory() { return baseView.getInventory(); } + + @Override + public boolean isStarted() + { + return baseView.isStarted(); + } + + @Override + public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment) + { + return baseView.isSegmentLoadedByServer(serverKey, segment); + } } diff --git a/server/src/main/java/io/druid/client/DruidServer.java b/server/src/main/java/io/druid/client/DruidServer.java index 1a05b29422cc..0c903186ecee 100644 --- a/server/src/main/java/io/druid/client/DruidServer.java +++ b/server/src/main/java/io/druid/client/DruidServer.java @@ -226,6 +226,15 @@ public Iterable getDataSources() return dataSources.values(); } + public void removeAllSegments() + { + synchronized (lock) { + dataSources.clear(); + segments.clear(); + currSize = 0; + } + } + @Override public boolean equals(Object o) { diff --git a/server/src/main/java/io/druid/client/DruidServerDiscovery.java b/server/src/main/java/io/druid/client/DruidServerDiscovery.java new file mode 100644 index 000000000000..7b28ef6850ae --- /dev/null +++ b/server/src/main/java/io/druid/client/DruidServerDiscovery.java @@ -0,0 +1,174 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.metamx.emitter.EmittingLogger; +import io.druid.concurrent.Execs; +import io.druid.curator.inventory.CuratorInventoryManager; +import io.druid.curator.inventory.CuratorInventoryManagerStrategy; +import io.druid.curator.inventory.InventoryManagerConfig; +import io.druid.java.util.common.ISE; +import org.apache.curator.framework.CuratorFramework; + +import java.io.IOException; + +/** + * Discovers DruidServer instances that serve segments using CuratorInventoryManager. + */ +public class DruidServerDiscovery +{ + private final EmittingLogger log = new EmittingLogger(DruidServerDiscovery.class); + private final CuratorInventoryManager curatorInventoryManager; + private volatile Listener listener; + + DruidServerDiscovery( + final CuratorFramework curatorFramework, + final String announcementsPath, + final ObjectMapper jsonMapper + ) + { + curatorInventoryManager = initCuratorInventoryManager(curatorFramework, announcementsPath, jsonMapper); + } + + public void start() throws Exception + { + Preconditions.checkNotNull(listener, "listener is not configured yet"); + curatorInventoryManager.start(); + } + + public void stop() throws IOException + { + curatorInventoryManager.stop(); + } + + private CuratorInventoryManager initCuratorInventoryManager( + final CuratorFramework curator, + final String announcementsPath, + final ObjectMapper jsonMapper + ) + { + return new CuratorInventoryManager<>( + curator, + new InventoryManagerConfig() + { + @Override + public String getContainerPath() + { + return announcementsPath; + } + + @Override + public String getInventoryPath() + { + return "/NON_EXISTENT_DUMMY_INVENTORY_PATH"; + } + }, + Execs.singleThreaded("CuratorInventoryManagerBasedServerDiscovery-%s"), + new CuratorInventoryManagerStrategy() + { + @Override + public DruidServer deserializeContainer(byte[] bytes) + { + try { + return jsonMapper.readValue(bytes, DruidServer.class); + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + + @Override + public void newContainer(DruidServer container) + { + log.info("New Server[%s]", container.getName()); + listener.serverAdded(container); + } + + @Override + public void deadContainer(DruidServer container) + { + log.info("Server Disappeared[%s]", container.getName()); + listener.serverRemoved(container); + } + + @Override + public DruidServer updateContainer(DruidServer oldContainer, DruidServer newContainer) + { + log.info("Server updated[%s]", oldContainer.getName()); + return listener.serverUpdated(oldContainer, newContainer); + } + + @Override + public Object deserializeInventory(byte[] bytes) + { + throw new ISE("no inventory should exist."); + } + + @Override + public DruidServer addInventory( + final DruidServer container, + String inventoryKey, + final Object inventory + ) + { + throw new ISE("no inventory should exist."); + } + + @Override + public DruidServer updateInventory( + DruidServer container, String inventoryKey, Object inventory + ) + { + throw new ISE("no inventory should exist."); + } + + @Override + public DruidServer removeInventory(final DruidServer container, String inventoryKey) + { + throw new ISE("no inventory should exist."); + } + + @Override + public void inventoryInitialized() + { + log.info("Server inventory initialized."); + listener.initialized(); + } + } + ); + } + + public void registerListener(Listener listener) + { + Preconditions.checkArgument(this.listener == null, "listener registered already."); + this.listener = listener; + } + + public interface Listener + { + void serverAdded(DruidServer server); + DruidServer serverUpdated(DruidServer oldServer, DruidServer newServer); + void serverRemoved(DruidServer server); + void initialized(); + } +} diff --git a/server/src/main/java/io/druid/client/FilteredHttpServerInventoryViewProvider.java b/server/src/main/java/io/druid/client/FilteredHttpServerInventoryViewProvider.java new file mode 100644 index 000000000000..6dbb7cf4bcb6 --- /dev/null +++ b/server/src/main/java/io/druid/client/FilteredHttpServerInventoryViewProvider.java @@ -0,0 +1,78 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicates; +import com.metamx.http.client.HttpClient; +import io.druid.guice.annotations.Client; +import io.druid.guice.annotations.Json; +import io.druid.guice.annotations.Smile; +import io.druid.java.util.common.Pair; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.initialization.ZkPathsConfig; +import io.druid.timeline.DataSegment; +import org.apache.curator.framework.CuratorFramework; + +import javax.validation.constraints.NotNull; + +/** + */ +public class FilteredHttpServerInventoryViewProvider implements FilteredServerInventoryViewProvider +{ + @JacksonInject + @NotNull + @Client + HttpClient httpClient = null; + + @JacksonInject + @NotNull + @Smile + ObjectMapper smileMapper = null; + + @JacksonInject + @NotNull + @Json + ObjectMapper jsonMapper = null; + + @JacksonInject + @NotNull + HttpServerInventoryViewConfig config = null; + + @JacksonInject + @NotNull + private ZkPathsConfig zkPaths = null; + + @JacksonInject + @NotNull + private CuratorFramework curator = null; + + @Override + public HttpServerInventoryView get() + { + return new HttpServerInventoryView( + jsonMapper, smileMapper, httpClient, + new DruidServerDiscovery(curator, zkPaths.getAnnouncementsPath(), jsonMapper), + Predicates.>alwaysTrue(), + config + ); + } +} diff --git a/server/src/main/java/io/druid/client/FilteredServerInventoryViewProvider.java b/server/src/main/java/io/druid/client/FilteredServerInventoryViewProvider.java index f6f7fa746085..c64cb4aa3c6d 100644 --- a/server/src/main/java/io/druid/client/FilteredServerInventoryViewProvider.java +++ b/server/src/main/java/io/druid/client/FilteredServerInventoryViewProvider.java @@ -27,7 +27,8 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = FilteredBatchServerInventoryViewProvider.class) @JsonSubTypes(value = { @JsonSubTypes.Type(name = "legacy", value = FilteredSingleServerInventoryViewProvider.class), - @JsonSubTypes.Type(name = "batch", value = FilteredBatchServerInventoryViewProvider.class) + @JsonSubTypes.Type(name = "batch", value = FilteredBatchServerInventoryViewProvider.class), + @JsonSubTypes.Type(name = "http", value = FilteredHttpServerInventoryViewProvider.class) }) public interface FilteredServerInventoryViewProvider extends Provider { diff --git a/server/src/main/java/io/druid/client/HttpServerInventoryView.java b/server/src/main/java/io/druid/client/HttpServerInventoryView.java new file mode 100644 index 000000000000..c2bb8eee61b8 --- /dev/null +++ b/server/src/main/java/io/druid/client/HttpServerInventoryView.java @@ -0,0 +1,663 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.collect.MapMaker; +import com.google.common.net.HostAndPort; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.inject.Inject; +import com.metamx.emitter.EmittingLogger; +import com.metamx.http.client.HttpClient; +import com.metamx.http.client.Request; +import com.metamx.http.client.io.AppendableByteArrayInputStream; +import com.metamx.http.client.response.ClientResponse; +import com.metamx.http.client.response.InputStreamResponseHandler; +import io.druid.concurrent.LifecycleLock; +import io.druid.guice.annotations.Global; +import io.druid.guice.annotations.Json; +import io.druid.guice.annotations.Smile; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.Pair; +import io.druid.java.util.common.lifecycle.LifecycleStart; +import io.druid.java.util.common.lifecycle.LifecycleStop; +import io.druid.server.coordination.DataSegmentChangeCallback; +import io.druid.server.coordination.DataSegmentChangeHandler; +import io.druid.server.coordination.DataSegmentChangeRequest; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordination.SegmentChangeRequestHistory; +import io.druid.server.coordination.SegmentChangeRequestsSnapshot; +import io.druid.timeline.DataSegment; +import org.jboss.netty.handler.codec.http.HttpHeaders; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.joda.time.Duration; + +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +/** + * This class uses CuratorInventoryManager to listen for queryable server membership which serve segments(e.g. Historicals). + * For each queryable server, it uses HTTP GET /druid-internal/v1/segments (see docs in SegmentListerResource.getSegments(..). + */ +public class HttpServerInventoryView implements ServerInventoryView, FilteredServerInventoryView +{ + private final EmittingLogger log = new EmittingLogger(HttpServerInventoryView.class); + private final DruidServerDiscovery serverDiscovery; + + private final LifecycleLock lifecycleLock = new LifecycleLock(); + + private final ConcurrentMap serverCallbacks = new MapMaker().makeMap(); + private final ConcurrentMap segmentCallbacks = new MapMaker().makeMap(); + + private final ConcurrentMap>> segmentPredicates = new MapMaker() + .makeMap(); + private final Predicate> defaultFilter; + private volatile Predicate> finalPredicate; + + // For each queryable server, a name -> DruidServerHolder entry is kept + private final Map servers = new HashMap<>(); + + private volatile ExecutorService executor; + + // a queue of queryable server names for which worker threads in executor initiate the segment list call i.e. + // DruidServerHolder.updateSegmentsListAsync(..) which updates the segment list asynchronously and adds itself + // to this queue again for next update. + private final BlockingQueue queue = new LinkedBlockingDeque<>(); + + + + private final HttpClient httpClient; + private final ObjectMapper smileMapper; + private final HttpServerInventoryViewConfig config; + + @Inject + public HttpServerInventoryView( + final @Json ObjectMapper jsonMapper, + final @Smile ObjectMapper smileMapper, + final @Global HttpClient httpClient, + final DruidServerDiscovery serverDiscovery, + final Predicate> defaultFilter, + final HttpServerInventoryViewConfig config + ) + { + this.httpClient = httpClient; + this.smileMapper = smileMapper; + this.serverDiscovery = serverDiscovery; + this.defaultFilter = defaultFilter; + this.finalPredicate = defaultFilter; + this.config = config; + } + + + @LifecycleStart + public void start() throws Exception + { + synchronized (lifecycleLock) { + if (!lifecycleLock.canStart()) { + throw new ISE("can't start."); + } + + log.info("Starting HttpServerInventoryView."); + + try { + executor = Executors.newFixedThreadPool( + config.getNumThreads(), + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HttpServerInventoryView-%s").build() + ); + + executor.execute( + new Runnable() + { + @Override + public void run() + { + if (!lifecycleLock.awaitStarted()) { + log.error("WTF! lifecycle not started, segments will not be discovered."); + return; + } + + while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + try { + String name = queue.take(); + + synchronized (servers) { + DruidServerHolder holder = servers.get(name); + if (holder != null) { + holder.updateSegmentsListAsync(); + } + } + } + catch (InterruptedException ex) { + log.info("main thread interrupted, served segments list is not synced anymore."); + Thread.currentThread().interrupt(); + } + catch (Throwable th) { + log.makeAlert(th, "main thread ignored error").emit(); + } + } + + log.info("HttpServerInventoryView main thread exited."); + } + } + ); + + serverDiscovery.registerListener( + new DruidServerDiscovery.Listener() + { + @Override + public void serverAdded(DruidServer server) + { + serverAddedOrUpdated(server); + } + + @Override + public DruidServer serverUpdated(DruidServer oldServer, DruidServer newServer) + { + return serverAddedOrUpdated(newServer); + } + + @Override + public void serverRemoved(DruidServer server) + { + HttpServerInventoryView.this.serverRemoved(server); + runServerCallbacks(server); + } + + @Override + public void initialized() + { + serverInventoryInitialized(); + } + } + ); + serverDiscovery.start(); + + log.info("Started HttpServerInventoryView."); + lifecycleLock.started(); + } finally { + lifecycleLock.exitStart(); + } + } + } + + @LifecycleStop + public void stop() throws IOException + { + synchronized (lifecycleLock) { + if (!lifecycleLock.canStop()) { + throw new ISE("can't stop."); + } + + log.info("Stopping HttpServerInventoryView."); + + serverDiscovery.stop(); + + if (executor != null) { + executor.shutdownNow(); + executor = null; + } + + queue.clear(); + + log.info("Stopped HttpServerInventoryView."); + } + } + + @Override + public void registerSegmentCallback( + Executor exec, SegmentCallback callback, Predicate> filter + ) + { + segmentCallbacks.put(callback, exec); + segmentPredicates.put(callback, filter); + + finalPredicate = Predicates.or( + defaultFilter, + Predicates.or(segmentPredicates.values()) + ); + } + + @Override + public void registerServerCallback(Executor exec, ServerCallback callback) + { + serverCallbacks.put(callback, exec); + } + + @Override + public void registerSegmentCallback(Executor exec, SegmentCallback callback) + { + segmentCallbacks.put(callback, exec); + } + + @Override + public DruidServer getInventoryValue(String containerKey) + { + synchronized (servers) { + DruidServerHolder holder = servers.get(containerKey); + if (holder != null) { + return holder.druidServer; + } + } + + return null; + } + + @Override + public Iterable getInventory() + { + synchronized (servers) { + return Iterables.transform( + servers.values(), new com.google.common.base.Function() + { + @Override + public DruidServer apply(DruidServerHolder input) + { + return input.druidServer; + } + } + ); + } + } + + private void runSegmentCallbacks( + final Function fn + ) + { + for (final Map.Entry entry : segmentCallbacks.entrySet()) { + entry.getValue().execute( + new Runnable() + { + @Override + public void run() + { + if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) { + segmentCallbacks.remove(entry.getKey()); + if (segmentPredicates.remove(entry.getKey()) != null) { + finalPredicate = Predicates.or( + defaultFilter, + Predicates.or(segmentPredicates.values()) + ); + } + } + } + } + ); + } + } + + private void runServerCallbacks(final DruidServer server) + { + for (final Map.Entry entry : serverCallbacks.entrySet()) { + entry.getValue().execute( + new Runnable() + { + @Override + public void run() + { + if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) { + serverCallbacks.remove(entry.getKey()); + } + } + } + ); + } + } + + //best effort wait for first segment listing fetch from all servers and then call + //segmentViewInitialized on all registered segment callbacks. + private void serverInventoryInitialized() + { + for (DruidServerHolder server : servers.values()) { + server.awaitInitialization(); + } + + log.info("Calling SegmentCallback.segmentViewInitialized() for all callbacks."); + + runSegmentCallbacks( + new Function() + { + @Override + public CallbackAction apply(SegmentCallback input) + { + return input.segmentViewInitialized(); + } + } + ); + } + + private DruidServer serverAddedOrUpdated(DruidServer server) + { + DruidServerHolder curr; + DruidServerHolder newHolder; + synchronized (servers) { + curr = servers.get(server.getName()); + newHolder = curr == null ? new DruidServerHolder(server) : curr.updatedHolder(server); + servers.put(server.getName(), newHolder); + } + + newHolder.updateSegmentsListAsync(); + + return newHolder.druidServer; + } + + private void serverRemoved(DruidServer server) + { + synchronized (servers) { + servers.remove(server.getName()); + } + } + + public DruidServer serverUpdated(DruidServer oldServer, DruidServer newServer) + { + return serverAddedOrUpdated(newServer); + } + + @Override + public boolean isStarted() + { + return lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS); + } + + @Override + public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment) + { + synchronized (servers) { + DruidServerHolder holder = servers.get(serverKey); + if (holder != null) { + return holder.druidServer.getSegment(segment.getIdentifier()) != null; + } else { + return false; + } + } + } + + private class DruidServerHolder + { + private final Object lock = new Object(); + + //lock is used to keep state in counter and and segment list in druidServer consistent + // so that in "updateHolder()" method, new DruidServerHolder with updated DruidServer info + // can be safely created + private final DruidServer druidServer; + + private volatile SegmentChangeRequestHistory.Counter counter = null; + + private final HostAndPort serverHostAndPort; + + private final DataSegmentChangeHandler changeHandler; + private final long serverHttpTimeout = config.getServerTimeout() + 1000; + + private final CountDownLatch initializationLatch = new CountDownLatch(1); + + DruidServerHolder(DruidServer druidServer) + { + this(druidServer, null); + } + + private DruidServerHolder(final DruidServer druidServer, final SegmentChangeRequestHistory.Counter counter) + { + this.druidServer = druidServer; + this.serverHostAndPort = HostAndPort.fromString(druidServer.getHost()); + this.counter = counter; + changeHandler = new DataSegmentChangeHandler() + { + @Override + public void addSegment( + final DataSegment segment, final DataSegmentChangeCallback callback + ) + { + if (finalPredicate.apply(Pair.of(druidServer.getMetadata(), segment))) { + druidServer.addDataSegment(segment.getIdentifier(), segment); + runSegmentCallbacks( + new Function() + { + @Override + public CallbackAction apply(SegmentCallback input) + { + return input.segmentAdded(druidServer.getMetadata(), segment); + } + } + ); + } + } + + @Override + public void removeSegment( + final DataSegment segment, final DataSegmentChangeCallback callback + ) + { + druidServer.removeDataSegment(segment.getIdentifier()); + + runSegmentCallbacks( + new Function() + { + @Override + public CallbackAction apply(SegmentCallback input) + { + return input.segmentRemoved(druidServer.getMetadata(), segment); + } + } + ); + } + }; + } + + //wait for first fetch of segment listing from server. + void awaitInitialization() + { + try { + if (!initializationLatch.await(serverHttpTimeout, TimeUnit.MILLISECONDS)) { + log.warn("Await initialization timed out for server [%s].", druidServer.getName()); + } + } catch (InterruptedException ex) { + log.warn("Await initialization interrupted while waiting on server [%s].", druidServer.getName()); + Thread.currentThread().interrupt(); + } + } + + DruidServerHolder updatedHolder(DruidServer server) + { + synchronized (lock) { + return new DruidServerHolder(server.addDataSegments(druidServer), counter) ; + } + } + + Future updateSegmentsListAsync() + { + try { + final String req; + if (counter != null) { + req = String.format( + "/druid-internal/v1/segments?counter=%s&hash=%s&timeout=%s", + counter.getCounter(), + counter.getHash(), + config.getServerTimeout() + ); + } else { + req = String.format( + "/druid-internal/v1/segments?counter=-1&timeout=%s", + config.getServerTimeout() + ); + } + URL url = new URL("http", serverHostAndPort.getHostText(), serverHostAndPort.getPort(), req); + + BytesAccumulatingResponseHandler responseHandler = new BytesAccumulatingResponseHandler(); + + log.debug("Sending segment list fetch request to [%s] on URL [%s]", druidServer.getName(), url); + + ListenableFuture future = httpClient.go( + new Request(HttpMethod.GET, url) + .addHeader( + HttpHeaders.Names.ACCEPT, + SmileMediaTypes.APPLICATION_JACKSON_SMILE + ) + .addHeader(HttpHeaders.Names.CONTENT_TYPE, SmileMediaTypes.APPLICATION_JACKSON_SMILE), + responseHandler, + new Duration(serverHttpTimeout) + ); + + log.debug("Sent segment list fetch request to [%s]", druidServer.getName()); + + Futures.addCallback( + future, + new FutureCallback() + { + @Override + public void onSuccess(InputStream stream) + { + try { + if (responseHandler.status == HttpServletResponse.SC_NO_CONTENT) { + log.debug("Received NO CONTENT from [%s]", druidServer.getName()); + return; + } else if (responseHandler.status != HttpServletResponse.SC_OK) { + onFailure(null); + return; + } + + log.debug("Received segment list response from [%s]", druidServer.getName()); + + SegmentChangeRequestsSnapshot delta = smileMapper.readValue( + stream, + SegmentChangeRequestsSnapshot.class + ); + + log.debug("Finished reading segment list response from [%s]", druidServer.getName()); + + synchronized (lock) { + if (delta.isResetCounter()) { + log.debug( + "Server [%s] requested resetCounter for reason [%s].", + druidServer.getName(), + delta.getResetCause() + ); + counter = null; + return; + } + + if (counter == null) { + druidServer.removeAllSegments(); + } + + for (DataSegmentChangeRequest request : delta.getRequests()) { + request.go(changeHandler, null); + } + counter = delta.getCounter(); + } + + initializationLatch.countDown(); + } + catch (Exception ex) { + log.error(ex, "error processing segment list response from server [%s]", druidServer.getName()); + } + finally { + queue.add(druidServer.getName()); + } + } + + @Override + public void onFailure(Throwable t) + { + try { + if (t != null) { + log.error( + t, + "failed to fetch segment list from server [%s]. Return code [%s], Reason: [%s]", + druidServer.getName(), + responseHandler.status, + responseHandler.description + ); + } else { + log.error( + "failed to fetch segment list from server [%s]. Return code [%s], Reason: [%s]", + druidServer.getName(), + responseHandler.status, + responseHandler.description + ); + } + + // sleep for a bit so that retry does not happen immediately. + try { + Thread.sleep(5000); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + finally { + queue.add(druidServer.getName()); + } + } + }, + executor + ); + + return future; + } catch (Throwable th) { + queue.add(druidServer.getName()); + log.makeAlert(th, "Fatal error while fetching segment list from server [%s].", druidServer.getName()).emit(); + + // sleep for a bit so that retry does not happen immediately. + try { + Thread.sleep(5000); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + + throw Throwables.propagate(th); + } + } + } + + private static class BytesAccumulatingResponseHandler extends InputStreamResponseHandler + { + private int status; + private String description; + + @Override + public ClientResponse handleResponse(HttpResponse response) + { + status = response.getStatus().getCode(); + description = response.getStatus().getReasonPhrase(); + return ClientResponse.unfinished(super.handleResponse(response).getObj()); + } + } +} diff --git a/server/src/main/java/io/druid/client/HttpServerInventoryViewConfig.java b/server/src/main/java/io/druid/client/HttpServerInventoryViewConfig.java new file mode 100644 index 000000000000..6375b875ec01 --- /dev/null +++ b/server/src/main/java/io/druid/client/HttpServerInventoryViewConfig.java @@ -0,0 +1,61 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.joda.time.Period; + +/** + */ +public class HttpServerInventoryViewConfig +{ + @JsonProperty + private final long serverTimeout; + + @JsonProperty + private final int numThreads; + + @JsonCreator + public HttpServerInventoryViewConfig( + @JsonProperty("serverTimeout") Period serverTimeout, + @JsonProperty("numThreads") Integer numThreads + ){ + this.serverTimeout = serverTimeout != null + ? serverTimeout.toStandardDuration().getMillis() + : 4*60*1000; //4 mins + + this.numThreads = numThreads != null ? numThreads.intValue() : 5; + + Preconditions.checkArgument(this.serverTimeout > 0, "server timeout must be > 0 ms"); + Preconditions.checkArgument(this.numThreads > 1, "numThreads must be > 1"); + } + + public long getServerTimeout() + { + return serverTimeout; + } + + public int getNumThreads() + { + return numThreads; + } +} diff --git a/server/src/main/java/io/druid/client/HttpServerInventoryViewProvider.java b/server/src/main/java/io/druid/client/HttpServerInventoryViewProvider.java new file mode 100644 index 000000000000..efc109542ec2 --- /dev/null +++ b/server/src/main/java/io/druid/client/HttpServerInventoryViewProvider.java @@ -0,0 +1,80 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicates; +import com.metamx.http.client.HttpClient; +import io.druid.guice.annotations.Client; +import io.druid.guice.annotations.Json; +import io.druid.guice.annotations.Smile; +import io.druid.java.util.common.Pair; +import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.initialization.ZkPathsConfig; +import io.druid.timeline.DataSegment; +import org.apache.curator.framework.CuratorFramework; + +import javax.validation.constraints.NotNull; + +/** + */ +public class HttpServerInventoryViewProvider implements ServerInventoryViewProvider +{ + @JacksonInject + @NotNull + @Client + HttpClient httpClient = null; + + @JacksonInject + @NotNull + @Smile + ObjectMapper smileMapper = null; + + @JacksonInject + @NotNull + @Json + ObjectMapper jsonMapper = null; + + @JacksonInject + @NotNull + HttpServerInventoryViewConfig config = null; + + @JacksonInject + @NotNull + private ZkPathsConfig zkPaths = null; + + @JacksonInject + @NotNull + private CuratorFramework curator = null; + + @Override + public HttpServerInventoryView get() + { + return new HttpServerInventoryView( + jsonMapper, + smileMapper, + httpClient, + new DruidServerDiscovery(curator, zkPaths.getAnnouncementsPath(), jsonMapper), + Predicates.>alwaysTrue(), + config + ); + } +} diff --git a/server/src/main/java/io/druid/client/InventoryView.java b/server/src/main/java/io/druid/client/InventoryView.java index 4956f1f65c6c..fd9ea0c0b2b0 100644 --- a/server/src/main/java/io/druid/client/InventoryView.java +++ b/server/src/main/java/io/druid/client/InventoryView.java @@ -19,10 +19,14 @@ package io.druid.client; +import io.druid.timeline.DataSegment; + /** */ public interface InventoryView { - public DruidServer getInventoryValue(String string); - public Iterable getInventory(); + DruidServer getInventoryValue(String string); + Iterable getInventory(); + boolean isStarted(); + boolean isSegmentLoadedByServer(String serverKey, DataSegment segment); } diff --git a/server/src/main/java/io/druid/client/ServerInventoryView.java b/server/src/main/java/io/druid/client/ServerInventoryView.java index 8cc61f9a37cf..27ba7e7d4f5f 100644 --- a/server/src/main/java/io/druid/client/ServerInventoryView.java +++ b/server/src/main/java/io/druid/client/ServerInventoryView.java @@ -19,329 +19,9 @@ package io.druid.client; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Charsets; -import com.google.common.base.Function; -import com.google.common.base.Throwables; -import com.google.common.collect.MapMaker; -import com.metamx.emitter.EmittingLogger; -import io.druid.concurrent.Execs; -import io.druid.curator.inventory.CuratorInventoryManager; -import io.druid.curator.inventory.CuratorInventoryManagerStrategy; -import io.druid.curator.inventory.InventoryManagerConfig; -import io.druid.java.util.common.StringUtils; -import io.druid.java.util.common.lifecycle.LifecycleStart; -import io.druid.java.util.common.lifecycle.LifecycleStop; -import io.druid.timeline.DataSegment; -import org.apache.curator.framework.CuratorFramework; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; - /** + * Marker interface for making batch/single/http server inventory view configurable. */ -public abstract class ServerInventoryView implements ServerView, InventoryView +public interface ServerInventoryView extends ServerView, InventoryView { - - private final EmittingLogger log; - private final CuratorInventoryManager inventoryManager; - private final AtomicBoolean started = new AtomicBoolean(false); - - private final ConcurrentMap serverCallbacks = new MapMaker().makeMap(); - private final ConcurrentMap segmentCallbacks = new MapMaker().makeMap(); - - public ServerInventoryView( - final EmittingLogger log, - final String announcementsPath, - final String inventoryPath, - final CuratorFramework curator, - final ObjectMapper jsonMapper, - final TypeReference typeReference - ) - { - this.log = log; - this.inventoryManager = new CuratorInventoryManager<>( - curator, - new InventoryManagerConfig() - { - @Override - public String getContainerPath() - { - return announcementsPath; - } - - @Override - public String getInventoryPath() - { - return inventoryPath; - } - }, - Execs.singleThreaded("ServerInventoryView-%s"), - new CuratorInventoryManagerStrategy() - { - @Override - public DruidServer deserializeContainer(byte[] bytes) - { - try { - return jsonMapper.readValue(bytes, DruidServer.class); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } - - @Override - public InventoryType deserializeInventory(byte[] bytes) - { - try { - return jsonMapper.readValue(bytes, typeReference); - } - catch (IOException e) { - CharBuffer.wrap(StringUtils.fromUtf8(bytes).toCharArray()); - CharBuffer charBuffer = Charsets.UTF_8.decode(ByteBuffer.wrap(bytes)); - log.error(e, "Could not parse json: %s", charBuffer.toString()); - throw Throwables.propagate(e); - } - } - - @Override - public void newContainer(DruidServer container) - { - log.info("New Server[%s]", container); - } - - @Override - public void deadContainer(DruidServer deadContainer) - { - log.info("Server Disappeared[%s]", deadContainer); - runServerCallbacks(deadContainer); - } - - @Override - public DruidServer updateContainer(DruidServer oldContainer, DruidServer newContainer) - { - return newContainer.addDataSegments(oldContainer); - } - - @Override - public DruidServer addInventory( - final DruidServer container, - String inventoryKey, - final InventoryType inventory - ) - { - return addInnerInventory(container, inventoryKey, inventory); - } - - @Override - public DruidServer updateInventory( - DruidServer container, String inventoryKey, InventoryType inventory - ) - { - return updateInnerInventory(container, inventoryKey, inventory); - } - - @Override - public DruidServer removeInventory(final DruidServer container, String inventoryKey) - { - return removeInnerInventory(container, inventoryKey); - } - - @Override - public void inventoryInitialized() - { - log.info("Inventory Initialized"); - runSegmentCallbacks( - new Function() - { - @Override - public CallbackAction apply(SegmentCallback input) - { - return input.segmentViewInitialized(); - } - } - ); - } - } - ); - } - - @LifecycleStart - public void start() throws Exception - { - synchronized (started) { - if (!started.get()) { - inventoryManager.start(); - started.set(true); - } - } - } - - @LifecycleStop - public void stop() throws IOException - { - synchronized (started) { - if (started.getAndSet(false)) { - inventoryManager.stop(); - } - } - } - - public boolean isStarted() - { - return started.get(); - } - - @Override - public DruidServer getInventoryValue(String containerKey) - { - return inventoryManager.getInventoryValue(containerKey); - } - - @Override - public Iterable getInventory() - { - return inventoryManager.getInventory(); - } - - @Override - public void registerServerCallback(Executor exec, ServerCallback callback) - { - serverCallbacks.put(callback, exec); - } - - @Override - public void registerSegmentCallback(Executor exec, SegmentCallback callback) - { - segmentCallbacks.put(callback, exec); - } - - public InventoryManagerConfig getInventoryManagerConfig() - { - return inventoryManager.getConfig(); - } - - protected void runSegmentCallbacks( - final Function fn - ) - { - for (final Map.Entry entry : segmentCallbacks.entrySet()) { - entry.getValue().execute( - new Runnable() - { - @Override - public void run() - { - if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) { - segmentCallbackRemoved(entry.getKey()); - segmentCallbacks.remove(entry.getKey()); - } - } - } - ); - } - } - - protected void runServerCallbacks(final DruidServer server) - { - for (final Map.Entry entry : serverCallbacks.entrySet()) { - entry.getValue().execute( - new Runnable() - { - @Override - public void run() - { - if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) { - serverCallbacks.remove(entry.getKey()); - } - } - } - ); - } - } - - protected void addSingleInventory( - final DruidServer container, - final DataSegment inventory - ) - { - log.debug("Server[%s] added segment[%s]", container.getName(), inventory.getIdentifier()); - - if (container.getSegment(inventory.getIdentifier()) != null) { - log.warn( - "Not adding or running callbacks for existing segment[%s] on server[%s]", - inventory.getIdentifier(), - container.getName() - ); - - return; - } - - container.addDataSegment(inventory.getIdentifier(), inventory); - - runSegmentCallbacks( - new Function() - { - @Override - public CallbackAction apply(SegmentCallback input) - { - return input.segmentAdded(container.getMetadata(), inventory); - } - } - ); - } - - protected void removeSingleInventory(final DruidServer container, String inventoryKey) - { - log.debug("Server[%s] removed segment[%s]", container.getName(), inventoryKey); - final DataSegment segment = container.getSegment(inventoryKey); - - if (segment == null) { - log.warn( - "Not running cleanup or callbacks for non-existing segment[%s] on server[%s]", - inventoryKey, - container.getName() - ); - - return; - } - - container.removeDataSegment(inventoryKey); - - runSegmentCallbacks( - new Function() - { - @Override - public CallbackAction apply(SegmentCallback input) - { - return input.segmentRemoved(container.getMetadata(), segment); - } - } - ); - } - - protected abstract DruidServer addInnerInventory( - final DruidServer container, - String inventoryKey, - final InventoryType inventory - ); - - protected abstract DruidServer updateInnerInventory( - final DruidServer container, - String inventoryKey, - final InventoryType inventory - ); - - protected abstract DruidServer removeInnerInventory( - final DruidServer container, - String inventoryKey - ); - - protected abstract void segmentCallbackRemoved(SegmentCallback callback); } diff --git a/server/src/main/java/io/druid/client/ServerInventoryViewProvider.java b/server/src/main/java/io/druid/client/ServerInventoryViewProvider.java index 6a9c05e7288a..2a0f2f17ac96 100644 --- a/server/src/main/java/io/druid/client/ServerInventoryViewProvider.java +++ b/server/src/main/java/io/druid/client/ServerInventoryViewProvider.java @@ -28,7 +28,8 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = BatchServerInventoryViewProvider.class) @JsonSubTypes(value = { @JsonSubTypes.Type(name = "legacy", value = SingleServerInventoryProvider.class), - @JsonSubTypes.Type(name = "batch", value = BatchServerInventoryViewProvider.class) + @JsonSubTypes.Type(name = "batch", value = BatchServerInventoryViewProvider.class), + @JsonSubTypes.Type(name = "http", value = HttpServerInventoryViewProvider.class), }) public interface ServerInventoryViewProvider extends Provider { diff --git a/server/src/main/java/io/druid/client/SingleServerInventoryView.java b/server/src/main/java/io/druid/client/SingleServerInventoryView.java index 7031e3c4f1dc..f54d6a10ecb3 100644 --- a/server/src/main/java/io/druid/client/SingleServerInventoryView.java +++ b/server/src/main/java/io/druid/client/SingleServerInventoryView.java @@ -40,7 +40,7 @@ /** */ @ManageLifecycle -public class SingleServerInventoryView extends ServerInventoryView implements FilteredServerInventoryView +public class SingleServerInventoryView extends AbstractCuratorServerInventoryView implements FilteredServerInventoryView { private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class); diff --git a/server/src/main/java/io/druid/guice/AnnouncerModule.java b/server/src/main/java/io/druid/guice/AnnouncerModule.java index 8d1152ea7cad..838780503935 100644 --- a/server/src/main/java/io/druid/guice/AnnouncerModule.java +++ b/server/src/main/java/io/druid/guice/AnnouncerModule.java @@ -25,8 +25,10 @@ import io.druid.concurrent.Execs; import io.druid.curator.announcement.Announcer; import io.druid.server.coordination.BatchDataSegmentAnnouncer; +import io.druid.server.coordination.CuratorDataSegmentServerAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.server.coordination.DataSegmentAnnouncerProvider; +import io.druid.server.coordination.DataSegmentServerAnnouncer; import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig; import org.apache.curator.framework.CuratorFramework; @@ -40,7 +42,8 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.announcer", BatchDataSegmentAnnouncerConfig.class); JsonConfigProvider.bind(binder, "druid.announcer", DataSegmentAnnouncerProvider.class); binder.bind(DataSegmentAnnouncer.class).toProvider(DataSegmentAnnouncerProvider.class); - binder.bind(BatchDataSegmentAnnouncer.class).in(ManageLifecycleLast.class); + binder.bind(BatchDataSegmentAnnouncer.class).in(LazySingleton.class); + binder.bind(DataSegmentServerAnnouncer.class).to(CuratorDataSegmentServerAnnouncer.class).in(LazySingleton.class); } @Provides diff --git a/server/src/main/java/io/druid/guice/ServerViewModule.java b/server/src/main/java/io/druid/guice/ServerViewModule.java index e88b1dbc55fd..d2860d107734 100644 --- a/server/src/main/java/io/druid/guice/ServerViewModule.java +++ b/server/src/main/java/io/druid/guice/ServerViewModule.java @@ -23,6 +23,7 @@ import com.google.inject.Module; import io.druid.client.FilteredServerInventoryView; import io.druid.client.FilteredServerInventoryViewProvider; +import io.druid.client.HttpServerInventoryViewConfig; import io.druid.client.InventoryView; import io.druid.client.ServerInventoryView; import io.druid.client.ServerInventoryViewProvider; @@ -37,6 +38,7 @@ public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.announcer", ServerInventoryViewProvider.class); JsonConfigProvider.bind(binder, "druid.announcer", FilteredServerInventoryViewProvider.class); + JsonConfigProvider.bind(binder, "druid.announcer.http", HttpServerInventoryViewConfig.class); binder.bind(InventoryView.class).to(ServerInventoryView.class); binder.bind(ServerView.class).to(ServerInventoryView.class); binder.bind(ServerInventoryView.class).toProvider(ServerInventoryViewProvider.class).in(ManageLifecycle.class); diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index ac158b31c3d9..1fc4a6fe6e73 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -50,6 +50,7 @@ import io.druid.segment.realtime.plumber.Committers; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.Plumbers; +import io.druid.server.coordination.DataSegmentServerAnnouncer; import org.joda.time.Interval; import java.io.Closeable; @@ -66,6 +67,7 @@ public class RealtimeManager implements QuerySegmentWalker private final List fireDepartments; private final QueryRunnerFactoryConglomerate conglomerate; + private final DataSegmentServerAnnouncer serverAnnouncer; /** * key=data source name,value=mappings of partition number to FireChief @@ -75,29 +77,31 @@ public class RealtimeManager implements QuerySegmentWalker @Inject public RealtimeManager( List fireDepartments, - QueryRunnerFactoryConglomerate conglomerate + QueryRunnerFactoryConglomerate conglomerate, + DataSegmentServerAnnouncer serverAnnouncer ) { - this.fireDepartments = fireDepartments; - this.conglomerate = conglomerate; - - this.chiefs = Maps.newHashMap(); + this(fireDepartments, conglomerate, serverAnnouncer, Maps.newHashMap()); } RealtimeManager( List fireDepartments, QueryRunnerFactoryConglomerate conglomerate, + DataSegmentServerAnnouncer serverAnnouncer, Map> chiefs ) { this.fireDepartments = fireDepartments; this.conglomerate = conglomerate; + this.serverAnnouncer = serverAnnouncer; this.chiefs = chiefs; } @LifecycleStart public void start() throws IOException { + serverAnnouncer.announce(); + for (final FireDepartment fireDepartment : fireDepartments) { final DataSchema schema = fireDepartment.getDataSchema(); @@ -129,6 +133,8 @@ public void stop() CloseQuietly.close(chief); } } + + serverAnnouncer.unannounce(); } public FireDepartmentMetrics getMetrics(String datasource) diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java index f978221e9558..282a967011c1 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/Appenderators.java @@ -112,12 +112,6 @@ public void unannounceSegments(Iterable segments) throws IOExceptio { // Do nothing } - - @Override - public boolean isAnnounced(DataSegment segment) - { - return false; - } }, null, null, diff --git a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java index 08e3bca4cbfd..bcbe99237982 100644 --- a/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java +++ b/server/src/main/java/io/druid/server/coordination/BatchDataSegmentAnnouncer.java @@ -24,10 +24,12 @@ import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import com.google.inject.Inject; - import io.druid.common.utils.UUIDUtils; import io.druid.curator.announcement.Announcer; import io.druid.java.util.common.ISE; @@ -38,8 +40,11 @@ import org.apache.curator.utils.ZKPaths; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentSkipListSet; @@ -47,7 +52,7 @@ /** */ -public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer +public class BatchDataSegmentAnnouncer implements DataSegmentAnnouncer { private static final Logger log = new Logger(BatchDataSegmentAnnouncer.class); @@ -64,6 +69,9 @@ public class BatchDataSegmentAnnouncer extends AbstractDataSegmentAnnouncer private final Map segmentLookup = Maps.newConcurrentMap(); private final Function segmentTransformer; + private final SegmentChangeRequestHistory changes = new SegmentChangeRequestHistory(); + private final SegmentZNode dummyZnode; + @Inject public BatchDataSegmentAnnouncer( DruidServerMetadata server, @@ -73,7 +81,6 @@ public BatchDataSegmentAnnouncer( ObjectMapper jsonMapper ) { - super(server, zkPaths, announcer, jsonMapper); this.config = config; this.announcer = announcer; this.jsonMapper = jsonMapper; @@ -95,18 +102,37 @@ public DataSegment apply(DataSegment input) return rv; } }; + + if (this.config.isSkipSegmentAnnouncementOnZk()) { + dummyZnode = new SegmentZNode("PLACE_HOLDER_ONLY"); + } else { + dummyZnode = null; + } } @Override public void announceSegment(DataSegment segment) throws IOException { - DataSegment toAnnounce = segmentTransformer.apply(segment); - int newBytesLen = jsonMapper.writeValueAsBytes(toAnnounce).length; - if (newBytesLen > config.getMaxBytesPerNode()) { - throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxBytesPerNode()); + if (segmentLookup.containsKey(segment)) { + log.info("Skipping announcement of segment [%s]. Announcement exists already."); + return; } + DataSegment toAnnounce = segmentTransformer.apply(segment); + synchronized (lock) { + changes.addSegmentChangeRequest(new SegmentChangeRequestLoad(toAnnounce)); + + if (config.isSkipSegmentAnnouncementOnZk()) { + segmentLookup.put(segment, dummyZnode); + return; + } + + int newBytesLen = jsonMapper.writeValueAsBytes(toAnnounce).length; + if (newBytesLen > config.getMaxBytesPerNode()) { + throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxBytesPerNode()); + } + boolean done = false; if (!availableZNodes.isEmpty()) { // update existing batch @@ -155,13 +181,20 @@ public void announceSegment(DataSegment segment) throws IOException @Override public void unannounceSegment(DataSegment segment) throws IOException { - final SegmentZNode segmentZNode = segmentLookup.remove(segment); - if (segmentZNode == null) { - log.warn("No path to unannounce segment[%s]", segment.getIdentifier()); - return; - } - synchronized (lock) { + final SegmentZNode segmentZNode = segmentLookup.remove(segment); + + if (segmentZNode == null) { + log.warn("No path to unannounce segment[%s]", segment.getIdentifier()); + return; + } + + changes.addSegmentChangeRequest(new SegmentChangeRequestDrop(segment)); + + if (config.isSkipSegmentAnnouncementOnZk()) { + return; + } + segmentZNode.removeSegment(segment); log.info("Unannouncing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath()); @@ -178,38 +211,60 @@ public void unannounceSegment(DataSegment segment) throws IOException @Override public void announceSegments(Iterable segments) throws IOException { - Iterable toAnnounce = Iterables.transform(segments, segmentTransformer); SegmentZNode segmentZNode = new SegmentZNode(makeServedSegmentPath()); Set batch = Sets.newHashSet(); + List changesBatch = new ArrayList<>(); + int byteSize = 0; int count = 0; - for (DataSegment segment : toAnnounce) { - int newBytesLen = jsonMapper.writeValueAsBytes(segment).length; + synchronized (lock) { + for (DataSegment ds : segments) { - if (newBytesLen > config.getMaxBytesPerNode()) { - throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxBytesPerNode()); - } + if (segmentLookup.containsKey(ds)) { + log.info("Skipping announcement of segment [%s]. Announcement exists already."); + return; + } - if (count >= config.getSegmentsPerNode() || byteSize + newBytesLen > config.getMaxBytesPerNode()) { - segmentZNode.addSegments(batch); - announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes()); + DataSegment segment = segmentTransformer.apply(ds); - segmentZNode = new SegmentZNode(makeServedSegmentPath()); - batch = Sets.newHashSet(); - count = 0; - byteSize = 0; - } + changesBatch.add(new SegmentChangeRequestLoad(segment)); - log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath()); - segmentLookup.put(segment, segmentZNode); - batch.add(segment); - count++; - byteSize += newBytesLen; + if (config.isSkipSegmentAnnouncementOnZk()) { + segmentLookup.put(segment, dummyZnode); + continue; + } + + int newBytesLen = jsonMapper.writeValueAsBytes(segment).length; + + if (newBytesLen > config.getMaxBytesPerNode()) { + throw new ISE("byte size %,d exceeds %,d", newBytesLen, config.getMaxBytesPerNode()); + } + + if (count >= config.getSegmentsPerNode() || byteSize + newBytesLen > config.getMaxBytesPerNode()) { + segmentZNode.addSegments(batch); + announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes()); + + segmentZNode = new SegmentZNode(makeServedSegmentPath()); + batch = Sets.newHashSet(); + count = 0; + byteSize = 0; + } + + log.info("Announcing segment[%s] at path[%s]", segment.getIdentifier(), segmentZNode.getPath()); + segmentLookup.put(segment, segmentZNode); + batch.add(segment); + count++; + byteSize += newBytesLen; + } } - segmentZNode.addSegments(batch); - announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes()); + changes.addSegmentChangeRequests(changesBatch); + + if (!config.isSkipSegmentAnnouncementOnZk()) { + segmentZNode.addSegments(batch); + announcer.announce(segmentZNode.getPath(), segmentZNode.getBytes()); + } } @Override @@ -220,21 +275,46 @@ public void unannounceSegments(Iterable segments) throws IOExceptio } } - @Override - public boolean isAnnounced(DataSegment segment) + /** + * Returns Future that lists the segment load/drop requests since given counter. + */ + public ListenableFuture getSegmentChangesSince(SegmentChangeRequestHistory.Counter counter) { - return segmentLookup.containsKey(segment); + if (counter.getCounter() < 0) { + synchronized (lock) { + Iterable segments = Iterables.transform( + segmentLookup.keySet(), + new Function() + { + @Nullable + @Override + public SegmentChangeRequestLoad apply(DataSegment input) + { + return new SegmentChangeRequestLoad(input); + } + } + ); + + SettableFuture future = SettableFuture.create(); + future.set(SegmentChangeRequestsSnapshot.success(changes.getLastCounter(), Lists.newArrayList(segments))); + return future; + } + } else { + return changes.getRequestsSince(counter); + } } private String makeServedSegmentPath() { // server.getName() is already in the zk path - return makeServedSegmentPath(UUIDUtils.generateUuid( - server.getHost(), - server.getType(), - server.getTier(), - new DateTime().toString() - )); + return makeServedSegmentPath( + UUIDUtils.generateUuid( + server.getHost(), + server.getType(), + server.getTier(), + new DateTime().toString() + ) + ); } private String makeServedSegmentPath(String zNode) diff --git a/server/src/main/java/io/druid/server/coordination/AbstractDataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java similarity index 77% rename from server/src/main/java/io/druid/server/coordination/AbstractDataSegmentAnnouncer.java rename to server/src/main/java/io/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java index cdcc5bbf5766..6b2af0f6d82f 100644 --- a/server/src/main/java/io/druid/server/coordination/AbstractDataSegmentAnnouncer.java +++ b/server/src/main/java/io/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java @@ -22,19 +22,17 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; - +import com.google.inject.Inject; import io.druid.curator.announcement.Announcer; -import io.druid.java.util.common.lifecycle.LifecycleStart; -import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.java.util.common.logger.Logger; import io.druid.server.initialization.ZkPathsConfig; import org.apache.curator.utils.ZKPaths; /** */ -public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnouncer +public class CuratorDataSegmentServerAnnouncer implements DataSegmentServerAnnouncer { - private static final Logger log = new Logger(AbstractDataSegmentAnnouncer.class); + private static final Logger log = new Logger(CuratorDataSegmentServerAnnouncer.class); private final DruidServerMetadata server; private final ZkPathsConfig config; @@ -43,9 +41,10 @@ public abstract class AbstractDataSegmentAnnouncer implements DataSegmentAnnounc private final Object lock = new Object(); - private volatile boolean started = false; + private volatile boolean announced = false; - protected AbstractDataSegmentAnnouncer( + @Inject + public CuratorDataSegmentServerAnnouncer( DruidServerMetadata server, ZkPathsConfig config, Announcer announcer, @@ -58,11 +57,11 @@ protected AbstractDataSegmentAnnouncer( this.jsonMapper = jsonMapper; } - @LifecycleStart - public void start() + @Override + public void announce() { synchronized (lock) { - if (started) { + if (announced) { return; } @@ -75,22 +74,23 @@ public void start() throw Throwables.propagate(e); } - started = true; + announced = true; } } - @LifecycleStop - public void stop() + @Override + public void unannounce() { synchronized (lock) { - if (!started) { + if (!announced) { return; } - log.info("Stopping %s with config[%s]", getClass(), config); - announcer.unannounce(makeAnnouncementPath()); + final String path = makeAnnouncementPath(); + log.info("Unannouncing self[%s] at [%s]", server, path); + announcer.unannounce(path); - started = false; + announced = false; } } diff --git a/server/src/main/java/io/druid/server/coordination/DataSegmentAnnouncer.java b/server/src/main/java/io/druid/server/coordination/DataSegmentAnnouncer.java index ef6eafab6a7c..d6e8c4a27a8d 100644 --- a/server/src/main/java/io/druid/server/coordination/DataSegmentAnnouncer.java +++ b/server/src/main/java/io/druid/server/coordination/DataSegmentAnnouncer.java @@ -32,9 +32,4 @@ public interface DataSegmentAnnouncer public void announceSegments(Iterable segments) throws IOException; public void unannounceSegments(Iterable segments) throws IOException; - - /** - * @return true if the segment was already announced, otherwise false - */ - public boolean isAnnounced(DataSegment segment); } diff --git a/server/src/main/java/io/druid/server/coordination/DataSegmentServerAnnouncer.java b/server/src/main/java/io/druid/server/coordination/DataSegmentServerAnnouncer.java new file mode 100644 index 000000000000..791c62c57a70 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordination/DataSegmentServerAnnouncer.java @@ -0,0 +1,28 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordination; + +/** + */ +public interface DataSegmentServerAnnouncer +{ + void announce(); + void unannounce(); +} diff --git a/server/src/main/java/io/druid/server/coordination/SegmentChangeRequestHistory.java b/server/src/main/java/io/druid/server/coordination/SegmentChangeRequestHistory.java new file mode 100644 index 000000000000..e8389b93ece8 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordination/SegmentChangeRequestHistory.java @@ -0,0 +1,354 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordination; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.AbstractFuture; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.druid.common.utils.StringUtils; +import io.druid.java.util.common.IAE; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * This class keeps a bounded list of segment updates made on the server such as adding/dropping segments. + * + * Clients call addSegmentChangeRequest(DataSegmentChangeRequest) or addSegmentChangeRequests(List) + * to add segment updates. + * + * Clients call ListenableFuture getRequestsSince(final Counter counter) to get segment + * updates since given counter. + */ +public class SegmentChangeRequestHistory +{ + private static int MAX_SIZE = 1000; + + private final int maxSize; + + private final CircularBuffer changes; + + @VisibleForTesting + final LinkedHashMap waitingFutures; + + private final ExecutorService singleThreadedExecutor; + private final Runnable resolveWaitingFuturesRunnable; + + public SegmentChangeRequestHistory() + { + this(MAX_SIZE); + } + + public SegmentChangeRequestHistory(int maxSize) + { + this.maxSize = maxSize; + this.changes = new CircularBuffer(maxSize); + + this.waitingFutures = new LinkedHashMap<>(); + + this.resolveWaitingFuturesRunnable = new Runnable() + { + @Override + public void run() + { + resolveWaitingFutures(); + } + }; + + this.singleThreadedExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat( + "SegmentChangeRequestHistory" + ) + .build() + ); + } + + + + /** + * Add batch of segment changes update. + */ + public synchronized void addSegmentChangeRequests(List requests) + { + for (DataSegmentChangeRequest request : requests) { + changes.add(new Holder(request, getLastCounter().inc())); + } + + singleThreadedExecutor.execute(resolveWaitingFuturesRunnable); + } + + /** + * Add single segment change update. + */ + public synchronized void addSegmentChangeRequest(DataSegmentChangeRequest request) + { + addSegmentChangeRequests(ImmutableList.of(request)); + } + + /** + * Returns a Future that , on completion, returns list of segment updates and associated counter. + * If there are no update since given counter then Future completion waits till an updates is provided. + * + * If counter is older than max number of changes maintained then SegmentChangeRequestsSnapshot is returned + * with resetCounter set to True. + * + * If there were no updates to provide immediately then a future is created and returned to caller. This future + * is added to the "waitingFutures" list and all the futures in the list get resolved as soon as a segment + * update is provided. + */ + public synchronized ListenableFuture getRequestsSince(final Counter counter) + { + final CustomSettableFuture future = new CustomSettableFuture(waitingFutures); + + if (counter.counter < 0) { + future.setException(new IAE("counter[%s] must be >= 0", counter)); + return future; + } + + Counter lastCounter = getLastCounter(); + + if (counter.counter == lastCounter.counter) { + if (!counter.matches(lastCounter)) { + future.setException(new IAE("counter[%s] failed to match with [%s]", counter, lastCounter)); + } else { + synchronized (waitingFutures) { + waitingFutures.put(future, counter); + } + } + } else { + try { + future.set(getRequestsSinceWithoutWait(counter)); + } catch (Exception ex) { + future.setException(ex); + } + } + + return future; + } + + private synchronized SegmentChangeRequestsSnapshot getRequestsSinceWithoutWait(final Counter counter) + { + Counter lastCounter = getLastCounter(); + + if (counter.counter >= lastCounter.counter) { + throw new IAE("counter[%s] >= last counter[%s]", counter, lastCounter); + } else if (lastCounter.counter - counter.counter >= maxSize) { + // Note: counter reset is requested when client ask for "maxSize" number of changes even if all those changes + // are present in the history because one extra elements is needed to match the counter hash. + return SegmentChangeRequestsSnapshot.fail( + StringUtils.safeFormat( + "can't serve request, not enough history is kept. given counter [%s] and current last counter [%s]", + counter, + lastCounter + ) + ); + } else { + int changeStartIndex = (int) (counter.counter + changes.size() - lastCounter.counter); + + Counter counterToMatch = counter.counter == 0 ? Counter.ZERO : changes.get(changeStartIndex - 1).counter; + if (!counterToMatch.matches(counter)) { + throw new IAE("counter[%s] failed to match with [%s]", counter, counterToMatch); + } + + List result = new ArrayList<>(); + for (int i = changeStartIndex; i < changes.size(); i++) { + result.add(changes.get(i).changeRequest); + } + + return SegmentChangeRequestsSnapshot.success(changes.get(changes.size() - 1).counter, result); + } + } + + private void resolveWaitingFutures() + { + final LinkedHashMap waitingFuturesCopy = new LinkedHashMap<>(); + synchronized (waitingFutures) { + waitingFuturesCopy.putAll(waitingFutures); + waitingFutures.clear(); + } + + for (Map.Entry e : waitingFuturesCopy.entrySet()) { + try { + e.getKey().set(getRequestsSinceWithoutWait(e.getValue())); + } catch (Exception ex) { + e.getKey().setException(ex); + } + } + } + + public synchronized Counter getLastCounter() + { + if (changes.size() > 0) { + return changes.get(changes.size() - 1).counter; + } else { + return Counter.ZERO; + } + } + + private static class Holder + { + private final DataSegmentChangeRequest changeRequest; + private final Counter counter; + + public Holder(DataSegmentChangeRequest changeRequest, Counter counter) + { + this.changeRequest = changeRequest; + this.counter = counter; + } + } + + public static class Counter + { + public static final Counter ZERO = new Counter(0); + + private final long counter; + private final long hash; + + public Counter(long counter) + { + this(counter, System.currentTimeMillis()); + } + + @JsonCreator + public Counter( + @JsonProperty("counter") long counter, + @JsonProperty("hash") long hash + ) + { + this.counter = counter; + this.hash = hash; + } + + @JsonProperty + public long getCounter() + { + return counter; + } + + @JsonProperty + public long getHash() + { + return hash; + } + + public Counter inc() + { + return new Counter(counter + 1); + } + + public boolean matches(Counter other) + { + return this.counter == other.counter && this.hash == other.hash; + } + + @Override + public String toString() + { + return "Counter{" + + "counter=" + counter + + ", hash=" + hash + + '}'; + } + } + + // Future with cancel() implementation to remove it from "waitingFutures" list + private static class CustomSettableFuture extends AbstractFuture + { + private final LinkedHashMap waitingFutures; + + private CustomSettableFuture(LinkedHashMap waitingFutures) + { + this.waitingFutures = waitingFutures; + } + + @Override + public boolean set(SegmentChangeRequestsSnapshot value) + { + return super.set(value); + } + + @Override + public boolean setException(Throwable throwable) + { + return super.setException(throwable); + } + + @Override + public boolean cancel(boolean interruptIfRunning) + { + synchronized (waitingFutures) { + waitingFutures.remove(this); + } + return true; + } + } + + static class CircularBuffer + { + private final E[] buffer; + + private int start = 0; + private int size = 0; + + CircularBuffer(int capacity) + { + buffer = (E[]) new Object[capacity]; + } + + void add(E item) + { + buffer[start++] = item; + + if (start >= buffer.length) { + start = 0; + } + + if (size < buffer.length) { + size++; + } + } + + E get(int index) + { + Preconditions.checkArgument(index >= 0 && index < size, "invalid index"); + + int bufferIndex = (start-size+index) % buffer.length; + if (bufferIndex < 0) { + bufferIndex += buffer.length; + } + return buffer[bufferIndex]; + } + + int size() + { + return size; + } + } +} diff --git a/server/src/main/java/io/druid/server/coordination/SegmentChangeRequestsSnapshot.java b/server/src/main/java/io/druid/server/coordination/SegmentChangeRequestsSnapshot.java new file mode 100644 index 000000000000..893e04e474ce --- /dev/null +++ b/server/src/main/java/io/druid/server/coordination/SegmentChangeRequestsSnapshot.java @@ -0,0 +1,108 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordination; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import java.util.List; + +/** + * Return type of SegmentChangeRequestHistory.getRequestsSince(counter). + */ +public class SegmentChangeRequestsSnapshot +{ + //if true, that means caller should reset the counter and request again. + private final boolean resetCounter; + + //cause for reset if resetCounter is true + private final String resetCause; + + //segments requests delta since counter, if resetCounter if false + private final SegmentChangeRequestHistory.Counter counter; + private final List requests; + + @JsonCreator + public SegmentChangeRequestsSnapshot( + @JsonProperty("resetCounter") boolean resetCounter, + @JsonProperty("resetCause") String resetCause, + @JsonProperty("counter") SegmentChangeRequestHistory.Counter counter, + @JsonProperty("requests") List requests + ) + { + this.resetCounter = resetCounter; + this.resetCause = resetCause; + + if (resetCounter) { + Preconditions.checkNotNull(resetCause, "NULL resetCause when resetCounter is true."); + } + + + this.counter = counter; + this.requests = requests; + } + + public static SegmentChangeRequestsSnapshot success(SegmentChangeRequestHistory.Counter counter, + List requests) + { + return new SegmentChangeRequestsSnapshot(false, null, counter, requests); + } + + public static SegmentChangeRequestsSnapshot fail(String resetCause) + { + return new SegmentChangeRequestsSnapshot(true, resetCause, null, null); + } + + @JsonProperty + public boolean isResetCounter() + { + return resetCounter; + } + + @JsonProperty + public String getResetCause() + { + return resetCause; + } + + @JsonProperty + public SegmentChangeRequestHistory.Counter getCounter() + { + return counter; + } + + @JsonProperty + public List getRequests() + { + return requests; + } + + @Override + public String toString() + { + return "SegmentChangeRequestsSnapshot{" + + "resetCounter=" + resetCounter + + ", resetCause='" + resetCause + '\'' + + ", counter=" + counter + + ", requests=" + requests + + '}'; + } +} diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index 51f7e48188f9..baa37403d659 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -72,6 +72,7 @@ public class ZkCoordinator implements DataSegmentChangeHandler private final DruidServerMetadata me; private final CuratorFramework curator; private final DataSegmentAnnouncer announcer; + private final DataSegmentServerAnnouncer serverAnnouncer; private final ServerManager serverManager; private final ScheduledExecutorService exec; private final ConcurrentSkipListSet segmentsToDelete; @@ -87,6 +88,7 @@ public ZkCoordinator( ZkPathsConfig zkPaths, DruidServerMetadata me, DataSegmentAnnouncer announcer, + DataSegmentServerAnnouncer serverAnnouncer, CuratorFramework curator, ServerManager serverManager, ScheduledExecutorFactory factory @@ -98,6 +100,7 @@ public ZkCoordinator( this.me = me; this.curator = curator; this.announcer = announcer; + this.serverAnnouncer = serverAnnouncer; this.serverManager = serverManager; this.exec = factory.create(1, "ZkCoordinator-Exec--%d"); @@ -132,6 +135,7 @@ public void start() throws IOException curator.newNamespaceAwareEnsurePath(liveSegmentsLocation).ensure(curator.getZookeeperClient()); loadLocalCache(); + serverAnnouncer.announce(); loadQueueCache.getListenable().addListener( new PathChildrenCacheListener() @@ -226,6 +230,7 @@ public void stop() try { loadQueueCache.close(); + serverAnnouncer.unannounce(); } catch (Exception e) { throw Throwables.propagate(e); @@ -360,13 +365,11 @@ each time when addSegment() is called, it has to wait for the lock in order to m } } loadSegment(segment, callback); - if (!announcer.isAnnounced(segment)) { - try { - announcer.announceSegment(segment); - } - catch (IOException e) { - throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); - } + try { + announcer.announceSegment(segment); + } + catch (IOException e) { + throw new SegmentLoadingException(e, "Failed to announce segment[%s]", segment.getIdentifier()); } } catch (SegmentLoadingException e) { @@ -408,14 +411,12 @@ public void run() segment.getIdentifier() ); loadSegment(segment, callback); - if (!announcer.isAnnounced(segment)) { - try { - backgroundSegmentAnnouncer.announceSegment(segment); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new SegmentLoadingException(e, "Loading Interrupted"); - } + try { + backgroundSegmentAnnouncer.announceSegment(segment); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new SegmentLoadingException(e, "Loading Interrupted"); } } catch (SegmentLoadingException e) { diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java index b36f0061049c..86636d7eab9f 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinator.java @@ -117,7 +117,7 @@ public Interval apply(DataSegment segment) private final ZkPathsConfig zkPaths; private final JacksonConfigManager configManager; private final MetadataSegmentManager metadataSegmentManager; - private final ServerInventoryView serverInventoryView; + private final ServerInventoryView serverInventoryView; private final MetadataRuleManager metadataRuleManager; private final CuratorFramework curator; private final ServiceEmitter emitter; @@ -362,9 +362,9 @@ public String getCurrentLeader() } public void moveSegment( - ImmutableDruidServer fromServer, - ImmutableDruidServer toServer, - String segmentName, + final ImmutableDruidServer fromServer, + final ImmutableDruidServer toServer, + final String segmentName, final LoadPeonCallback callback ) { @@ -405,10 +405,6 @@ public void moveSegment( toServer.getName() ), segmentName ); - final String toServedSegPath = ZKPaths.makePath( - ZKPaths.makePath(serverInventoryView.getInventoryManagerConfig().getInventoryPath(), toServer.getName()), - segmentName - ); loadPeon.loadSegment( segment, @@ -418,7 +414,7 @@ public void moveSegment( public void execute() { try { - if (curator.checkExists().forPath(toServedSegPath) != null && + if (serverInventoryView.isSegmentLoadedByServer(toServer.getName(), segment) && curator.checkExists().forPath(toLoadQueueSegPath) == null && !dropPeon.getSegmentsToDrop().contains(segment)) { dropPeon.dropSegment(segment, callback); diff --git a/server/src/main/java/io/druid/server/http/SegmentListerResource.java b/server/src/main/java/io/druid/server/http/SegmentListerResource.java new file mode 100644 index 000000000000..c24ec4c60a1c --- /dev/null +++ b/server/src/main/java/io/druid/server/http/SegmentListerResource.java @@ -0,0 +1,229 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.http; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.inject.Inject; +import com.metamx.emitter.EmittingLogger; +import com.sun.jersey.spi.container.ResourceFilters; +import io.druid.guice.annotations.Json; +import io.druid.guice.annotations.Smile; +import io.druid.server.coordination.BatchDataSegmentAnnouncer; +import io.druid.server.coordination.SegmentChangeRequestHistory; +import io.druid.server.coordination.SegmentChangeRequestsSnapshot; +import io.druid.server.http.security.StateResourceFilter; +import io.druid.server.security.AuthConfig; + +import javax.annotation.Nullable; +import javax.servlet.AsyncContext; +import javax.servlet.AsyncEvent; +import javax.servlet.AsyncListener; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import java.io.IOException; + +/** + */ +@Path("/druid-internal/v1/segments/") +@ResourceFilters(StateResourceFilter.class) +public class SegmentListerResource +{ + protected static final EmittingLogger log = new EmittingLogger(SegmentListerResource.class); + + protected final ObjectMapper jsonMapper; + protected final ObjectMapper smileMapper; + protected final AuthConfig authConfig; + private final BatchDataSegmentAnnouncer announcer; + + @Inject + public SegmentListerResource( + @Json ObjectMapper jsonMapper, + @Smile ObjectMapper smileMapper, + AuthConfig authConfig, + @Nullable BatchDataSegmentAnnouncer announcer + ) + { + this.jsonMapper = jsonMapper; + this.smileMapper = smileMapper; + this.authConfig = authConfig; + this.announcer = announcer; + } + + /** + * This endpoint is used by HttpServerInventoryView to keep an up-to-date list of segments served by + * historical/realtime nodes. + * + * This endpoint lists segments served by this server and can also incrementally provide the segments added/dropped + * since last response. + * + * Here is how, this is used. + * + * (1) Client sends first request /druid/internal/v1/segments?counter=-1&timeout= + * Server responds with list of segments currently served and a pair. + * + * (2) Client sends subsequent requests /druid/internal/v1/segments?counter=&hash=&timeout= + * Where values are used from the last response. Server responds with list of segment updates + * since given counter. + * + * This endpoint makes the client wait till either there is some segment update or given timeout elapses. + * + * So, clients keep on sending next request immediately after receiving the response in order to keep the list + * of segments served by this server up-to-date. + * + * @param counter counter received in last response. + * @param hash hash received in last response. + * @param timeout after which response is sent even if there are no new segment updates. + * @param req + * @throws IOException + */ + @GET + @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) + @Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) + public void getSegments( + @QueryParam("counter") long counter, + @QueryParam("hash") long hash, + @QueryParam("timeout") long timeout, + @Context final HttpServletRequest req + ) throws IOException + { + if (announcer == null) { + sendErrorResponse(req, HttpServletResponse.SC_NOT_FOUND, "announcer is not available."); + return; + } + + if (timeout <= 0) { + sendErrorResponse(req, HttpServletResponse.SC_BAD_REQUEST, "timeout must be positive."); + return; + } + + final ResponseContext context = createContext(req.getHeader("Accept")); + final ListenableFuture future = announcer.getSegmentChangesSince( + new SegmentChangeRequestHistory.Counter( + counter, + hash + ) + ); + + final AsyncContext asyncContext = req.startAsync(); + + asyncContext.addListener( + new AsyncListener() + { + @Override + public void onComplete(AsyncEvent event) throws IOException + { + } + + @Override + public void onTimeout(AsyncEvent event) throws IOException + { + + // HTTP 204 NO_CONTENT is sent to the client. + future.cancel(true); + event.getAsyncContext().complete(); + } + + @Override + public void onError(AsyncEvent event) throws IOException + { + } + + @Override + public void onStartAsync(AsyncEvent event) throws IOException + { + } + } + ); + + Futures.addCallback( + future, + new FutureCallback() + { + @Override + public void onSuccess(SegmentChangeRequestsSnapshot result) + { + try { + HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); + response.setStatus(HttpServletResponse.SC_OK); + context.inputMapper.writeValue(asyncContext.getResponse().getOutputStream(), result); + asyncContext.complete(); + } + catch (Exception ex) { + log.debug(ex, "Request timed out or closed already."); + } + } + + @Override + public void onFailure(Throwable th) + { + try { + HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); + if (th instanceof IllegalArgumentException) { + response.sendError(HttpServletResponse.SC_BAD_REQUEST, th.getMessage()); + } else { + response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, th.getMessage()); + } + asyncContext.complete(); + } + catch (Exception ex) { + log.debug(ex, "Request timed out or closed already."); + } + } + } + ); + + asyncContext.setTimeout(timeout); + } + + private void sendErrorResponse(HttpServletRequest req, int code, String error) throws IOException + { + AsyncContext asyncContext = req.startAsync(); + HttpServletResponse response = (HttpServletResponse) asyncContext.getResponse(); + response.sendError(code, error); + asyncContext.complete(); + } + + private ResponseContext createContext(String requestType) + { + boolean isSmile = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(requestType); + return new ResponseContext(isSmile ? smileMapper : jsonMapper); + } + + private static class ResponseContext + { + private final ObjectMapper inputMapper; + + ResponseContext(ObjectMapper inputMapper) + { + this.inputMapper = inputMapper; + } + } +} diff --git a/server/src/main/java/io/druid/server/initialization/BatchDataSegmentAnnouncerConfig.java b/server/src/main/java/io/druid/server/initialization/BatchDataSegmentAnnouncerConfig.java index ce7e95521964..c9c6f1f1d982 100644 --- a/server/src/main/java/io/druid/server/initialization/BatchDataSegmentAnnouncerConfig.java +++ b/server/src/main/java/io/druid/server/initialization/BatchDataSegmentAnnouncerConfig.java @@ -45,6 +45,9 @@ public class BatchDataSegmentAnnouncerConfig @JsonProperty private boolean skipDimensionsAndMetrics = false; + @JsonProperty + private boolean skipSegmentAnnouncementOnZk = false; + public int getSegmentsPerNode() { return segmentsPerNode; @@ -65,4 +68,8 @@ public boolean isSkipDimensionsAndMetrics() return skipDimensionsAndMetrics; } + public boolean isSkipSegmentAnnouncementOnZk() + { + return skipSegmentAnnouncementOnZk; + } } diff --git a/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java b/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java index 91143f8920bb..89b5c6acf9d8 100644 --- a/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java +++ b/server/src/test/java/io/druid/client/CoordinatorServerViewTest.java @@ -60,7 +60,7 @@ public class CoordinatorServerViewTest extends CuratorTestBase private CountDownLatch segmentAddedLatch; private CountDownLatch segmentRemovedLatch; - private ServerInventoryView baseView; + private BatchServerInventoryView baseView; private CoordinatorServerView overlordServerView; public CoordinatorServerViewTest() diff --git a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java index 3355586fe912..39119987a09e 100644 --- a/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java +++ b/server/src/test/java/io/druid/client/client/BatchServerInventoryViewTest.java @@ -30,7 +30,6 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; - import io.druid.client.BatchServerInventoryView; import io.druid.client.DruidServer; import io.druid.client.ServerView; @@ -41,6 +40,8 @@ import io.druid.java.util.common.Pair; import io.druid.java.util.common.guava.Comparators; import io.druid.server.coordination.BatchDataSegmentAnnouncer; +import io.druid.server.coordination.CuratorDataSegmentServerAnnouncer; +import io.druid.server.coordination.DataSegmentServerAnnouncer; import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig; import io.druid.server.initialization.ZkPathsConfig; @@ -87,6 +88,7 @@ public class BatchServerInventoryViewTest private ObjectMapper jsonMapper; private Announcer announcer; private BatchDataSegmentAnnouncer segmentAnnouncer; + private DataSegmentServerAnnouncer serverAnnouncer; private Set testSegments; private BatchServerInventoryView batchServerInventoryView; private BatchServerInventoryView filteredBatchServerInventoryView; @@ -118,15 +120,34 @@ public void setUp() throws Exception ); announcer.start(); + DruidServerMetadata serverMetadata = new DruidServerMetadata( + "id", + "host", + Long.MAX_VALUE, + "type", + "tier", + 0 + ); + + ZkPathsConfig zkPathsConfig = new ZkPathsConfig() + { + @Override + public String getBase() + { + return testBasePath; + } + }; + + serverAnnouncer = new CuratorDataSegmentServerAnnouncer( + serverMetadata, + zkPathsConfig, + announcer, + jsonMapper + ); + serverAnnouncer.announce(); + segmentAnnouncer = new BatchDataSegmentAnnouncer( - new DruidServerMetadata( - "id", - "host", - Long.MAX_VALUE, - "type", - "tier", - 0 - ), + serverMetadata, new BatchDataSegmentAnnouncerConfig() { @Override @@ -135,18 +156,10 @@ public int getSegmentsPerNode() return 50; } }, - new ZkPathsConfig() - { - @Override - public String getBase() - { - return testBasePath; - } - }, + zkPathsConfig, announcer, jsonMapper ); - segmentAnnouncer.start(); testSegments = Sets.newConcurrentHashSet(); for (int i = 0; i < INITIAL_SEGMENTS; i++) { @@ -207,7 +220,7 @@ public void tearDown() throws Exception { batchServerInventoryView.stop(); filteredBatchServerInventoryView.stop(); - segmentAnnouncer.stop(); + serverAnnouncer.unannounce(); announcer.stop(); cf.close(); testingCluster.stop(); @@ -453,7 +466,6 @@ public String getBase() announcer, jsonMapper ); - segmentAnnouncer.start(); List segments = new ArrayList(); try { for (int j = 0; j < INITIAL_SEGMENTS / numThreads; ++j) { diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 2484030ae135..5954c23e7aee 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -70,8 +70,10 @@ import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.segment.realtime.plumber.Sink; +import io.druid.server.coordination.DataSegmentServerAnnouncer; import io.druid.timeline.partition.LinearShardSpec; import io.druid.utils.Runnables; +import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; @@ -215,7 +217,8 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException tuningConfig ) ), - null + null, + EasyMock.createNiceMock(DataSegmentServerAnnouncer.class) ); plumber2 = new TestPlumber(new Sink( new Interval("0/P5000Y"), @@ -234,7 +237,8 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws IOException tuningConfig ) ), - null + null, + EasyMock.createNiceMock(DataSegmentServerAnnouncer.class) ); tuningConfig_0 = new RealtimeTuningConfig( @@ -319,6 +323,7 @@ public void run() realtimeManager3 = new RealtimeManager( Arrays.asList(department_0, department_1), conglomerate, + EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), ImmutableMap.>of( "testing", ImmutableMap.of( diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index fba52b2eacf8..a692c5b170ad 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -235,12 +235,6 @@ public void unannounceSegments(Iterable segments) throws IOExceptio { } - - @Override - public boolean isAnnounced(DataSegment segment) - { - return false; - } }, emitter, queryExecutor, diff --git a/server/src/test/java/io/druid/server/coordination/SegmentChangeRequestHistoryTest.java b/server/src/test/java/io/druid/server/coordination/SegmentChangeRequestHistoryTest.java new file mode 100644 index 000000000000..fe2c92306f7c --- /dev/null +++ b/server/src/test/java/io/druid/server/coordination/SegmentChangeRequestHistoryTest.java @@ -0,0 +1,246 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordination; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + */ +public class SegmentChangeRequestHistoryTest +{ + @Test + public void testSimple() throws Exception + { + SegmentChangeRequestHistory history = new SegmentChangeRequestHistory(); + Assert.assertEquals(0, history.getLastCounter().getCounter()); + + history.addSegmentChangeRequest(new SegmentChangeRequestNoop()); + Assert.assertEquals(1, history.getLastCounter().getCounter()); + + SegmentChangeRequestsSnapshot snapshot = history.getRequestsSince(SegmentChangeRequestHistory.Counter.ZERO).get(); + Assert.assertEquals(1, snapshot.getRequests().size()); + Assert.assertEquals(1, snapshot.getCounter().getCounter()); + + history.addSegmentChangeRequest(new SegmentChangeRequestNoop()); + Assert.assertEquals(2, history.getLastCounter().getCounter()); + + snapshot = history.getRequestsSince(snapshot.getCounter()).get(); + Assert.assertEquals(1, snapshot.getRequests().size()); + Assert.assertEquals(2, snapshot.getCounter().getCounter()); + + snapshot = history.getRequestsSince(SegmentChangeRequestHistory.Counter.ZERO).get(); + Assert.assertEquals(2, snapshot.getRequests().size()); + Assert.assertEquals(2, snapshot.getCounter().getCounter()); + } + + @Test + public void testTruncatedHistory() throws Exception + { + SegmentChangeRequestHistory history = new SegmentChangeRequestHistory(2); + + history.addSegmentChangeRequest(new SegmentChangeRequestNoop()); + SegmentChangeRequestHistory.Counter one = history.getLastCounter(); + + history.addSegmentChangeRequest(new SegmentChangeRequestNoop()); + SegmentChangeRequestHistory.Counter two = history.getLastCounter(); + + history.addSegmentChangeRequest(new SegmentChangeRequestNoop()); + SegmentChangeRequestHistory.Counter three = history.getLastCounter(); + + history.addSegmentChangeRequest(new SegmentChangeRequestNoop()); + SegmentChangeRequestHistory.Counter four = history.getLastCounter(); + + Assert.assertTrue(history.getRequestsSince(SegmentChangeRequestHistory.Counter.ZERO).get().isResetCounter()); + Assert.assertTrue(history.getRequestsSince(one).get().isResetCounter()); + Assert.assertTrue(history.getRequestsSince(two).get().isResetCounter()); + + SegmentChangeRequestsSnapshot snapshot = history.getRequestsSince(three).get(); + Assert.assertEquals(1, snapshot.getRequests().size()); + Assert.assertEquals(4, snapshot.getCounter().getCounter()); + } + + @Test + public void testCounterHashMismatch() throws Exception + { + SegmentChangeRequestHistory history = new SegmentChangeRequestHistory(3); + + try { + history.getRequestsSince(new SegmentChangeRequestHistory.Counter(0, 1234)).get(); + Assert.fail(); + } catch (ExecutionException ex) { + Assert.assertTrue(ex.getCause() instanceof IllegalArgumentException); + } + + history.addSegmentChangeRequest(new SegmentChangeRequestNoop()); + SegmentChangeRequestHistory.Counter one = history.getLastCounter(); + + history.addSegmentChangeRequest(new SegmentChangeRequestNoop()); + SegmentChangeRequestHistory.Counter two = history.getLastCounter(); + + try { + history.getRequestsSince(new SegmentChangeRequestHistory.Counter(0, 1234)).get(); + Assert.fail(); + } catch (ExecutionException ex) { + Assert.assertTrue(ex.getCause() instanceof IllegalArgumentException); + } + + SegmentChangeRequestsSnapshot snapshot = history.getRequestsSince(one).get(); + Assert.assertEquals(1, snapshot.getRequests().size()); + Assert.assertEquals(2, snapshot.getCounter().getCounter()); + + try { + history.getRequestsSince(new SegmentChangeRequestHistory.Counter(1, 1234)).get(); + Assert.fail(); + } catch (ExecutionException ex) { + Assert.assertTrue(ex.getCause() instanceof IllegalArgumentException); + } + + history.addSegmentChangeRequest(new SegmentChangeRequestNoop()); + SegmentChangeRequestHistory.Counter three = history.getLastCounter(); + + history.addSegmentChangeRequest(new SegmentChangeRequestNoop()); + SegmentChangeRequestHistory.Counter four = history.getLastCounter(); + + snapshot = history.getRequestsSince(two).get(); + Assert.assertEquals(2, snapshot.getRequests().size()); + Assert.assertEquals(4, snapshot.getCounter().getCounter()); + + try { + history.getRequestsSince(new SegmentChangeRequestHistory.Counter(2, 1234)).get(); + Assert.fail(); + } catch (ExecutionException ex) { + Assert.assertTrue(ex.getCause() instanceof IllegalArgumentException); + } + } + + @Test + public void testCancel() throws Exception + { + final SegmentChangeRequestHistory history = new SegmentChangeRequestHistory(); + + ListenableFuture future = history.getRequestsSince( + SegmentChangeRequestHistory.Counter.ZERO + ); + Assert.assertEquals(1, history.waitingFutures.size()); + + final AtomicBoolean callbackExcecuted = new AtomicBoolean(false); + Futures.addCallback( + future, + new FutureCallback() + { + @Override + public void onSuccess(SegmentChangeRequestsSnapshot result) + { + callbackExcecuted.set(true); + } + + @Override + public void onFailure(Throwable t) + { + callbackExcecuted.set(true); + } + } + ); + + future.cancel(true); + Assert.assertEquals(0, history.waitingFutures.size()); + Assert.assertFalse(callbackExcecuted.get()); + } + + @Test + public void testNonImmediateFuture() throws Exception + { + final SegmentChangeRequestHistory history = new SegmentChangeRequestHistory(); + + Future future = history.getRequestsSince( + SegmentChangeRequestHistory.Counter.ZERO + ); + + Assert.assertFalse(future.isDone()); + + history.addSegmentChangeRequest(new SegmentChangeRequestNoop()); + + SegmentChangeRequestsSnapshot snapshot = future.get(1, TimeUnit.MINUTES); + Assert.assertEquals(1, snapshot.getCounter().getCounter()); + Assert.assertEquals(1, snapshot.getRequests().size()); + } + + @Test + public void testCircularBuffer() throws Exception + { + SegmentChangeRequestHistory.CircularBuffer circularBuffer = new SegmentChangeRequestHistory.CircularBuffer<>( + 3); + + circularBuffer.add(1); + Assert.assertEquals(1, circularBuffer.size()); + Assert.assertEquals(1, (int) circularBuffer.get(0)); + + circularBuffer.add(2); + Assert.assertEquals(2, circularBuffer.size()); + for (int i = 0; i < circularBuffer.size(); i++) { + Assert.assertEquals(i+1, (int) circularBuffer.get(i)); + } + + circularBuffer.add(3); + Assert.assertEquals(3, circularBuffer.size()); + for (int i = 0; i < circularBuffer.size(); i++) { + Assert.assertEquals(i+1, (int) circularBuffer.get(i)); + } + + circularBuffer.add(4); + Assert.assertEquals(3, circularBuffer.size()); + for (int i = 0; i < circularBuffer.size(); i++) { + Assert.assertEquals(i+2, (int) circularBuffer.get(i)); + } + + circularBuffer.add(5); + Assert.assertEquals(3, circularBuffer.size()); + for (int i = 0; i < circularBuffer.size(); i++) { + Assert.assertEquals(i+3, (int) circularBuffer.get(i)); + } + + circularBuffer.add(6); + Assert.assertEquals(3, circularBuffer.size()); + for (int i = 0; i < circularBuffer.size(); i++) { + Assert.assertEquals(i+4, (int) circularBuffer.get(i)); + } + + circularBuffer.add(7); + Assert.assertEquals(3, circularBuffer.size()); + for (int i = 0; i < circularBuffer.size(); i++) { + Assert.assertEquals(i+5, (int) circularBuffer.get(i)); + } + + circularBuffer.add(8); + Assert.assertEquals(3, circularBuffer.size()); + for (int i = 0; i < circularBuffer.size(); i++) { + Assert.assertEquals(i+6, (int) circularBuffer.get(i)); + } + } +} diff --git a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java index bfafb38c14c1..0ed78d24e2ac 100644 --- a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java @@ -29,7 +29,6 @@ import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Module; - import io.druid.client.cache.CacheConfig; import io.druid.client.cache.LocalCacheProvider; import io.druid.concurrent.Execs; @@ -50,6 +49,7 @@ import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.apache.curator.framework.CuratorFramework; +import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; @@ -139,6 +139,7 @@ public String getBase() segmentsAnnouncedByMe = new ConcurrentSkipListSet<>(); announceCount = new AtomicInteger(0); + announcer = new DataSegmentAnnouncer() { private final DataSegmentAnnouncer delegate = new BatchDataSegmentAnnouncer( @@ -184,12 +185,6 @@ public void unannounceSegments(Iterable segments) throws IOExceptio announceCount.addAndGet(-Iterables.size(segments)); delegate.unannounceSegments(segments); } - - @Override - public boolean isAnnounced(DataSegment segment) - { - return segmentsAnnouncedByMe.contains(segment); - } }; zkCoordinator = new ZkCoordinator( @@ -223,6 +218,7 @@ public int getDropSegmentDelayMillis() zkPaths, me, announcer, + EasyMock.createNiceMock(DataSegmentServerAnnouncer.class), curator, serverManager, new ScheduledExecutorFactory() @@ -518,6 +514,7 @@ public String getBase() binder.bind(DruidServerMetadata.class) .toInstance(new DruidServerMetadata("dummyServer", "dummyHost", 0, "dummyType", "normal", 0)); binder.bind(DataSegmentAnnouncer.class).toInstance(announcer); + binder.bind(DataSegmentServerAnnouncer.class).toInstance(EasyMock.createNiceMock(DataSegmentServerAnnouncer.class)); binder.bind(CuratorFramework.class).toInstance(curator); binder.bind(ServerManager.class).toInstance(serverManager); binder.bind(ScheduledExecutorFactory.class).toInstance(ScheduledExecutors.createFactory(new Lifecycle())); diff --git a/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java b/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java index db58a4e3edfb..ba6d8e1cd68f 100644 --- a/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java +++ b/server/src/test/java/io/druid/server/coordination/coordination/BatchDataSegmentAnnouncerTest.java @@ -33,6 +33,8 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.server.coordination.BatchDataSegmentAnnouncer; import io.druid.server.coordination.DruidServerMetadata; +import io.druid.server.coordination.SegmentChangeRequestHistory; +import io.druid.server.coordination.SegmentChangeRequestsSnapshot; import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig; import io.druid.server.initialization.ZkPathsConfig; import io.druid.timeline.DataSegment; @@ -145,7 +147,6 @@ public String getBase() announcer, jsonMapper ); - segmentAnnouncer.start(); testSegments = Sets.newHashSet(); for (int i = 0; i < 100; i++) { @@ -156,7 +157,6 @@ public String getBase() @After public void tearDown() throws Exception { - segmentAnnouncer.stop(); announcer.stop(); cf.close(); testingCluster.stop(); @@ -185,6 +185,12 @@ public void testSingleAnnounce() throws Exception Assert.assertEquals(Sets.newHashSet(firstSegment, secondSegment), segments); } + SegmentChangeRequestsSnapshot snapshot = segmentAnnouncer.getSegmentChangesSince( + new SegmentChangeRequestHistory.Counter(-1, -1) + ).get(); + Assert.assertEquals(2, snapshot.getRequests().size()); + Assert.assertEquals(2, snapshot.getCounter().getCounter()); + segmentAnnouncer.unannounceSegment(firstSegment); for (String zNode : zNodes) { @@ -195,6 +201,18 @@ public void testSingleAnnounce() throws Exception segmentAnnouncer.unannounceSegment(secondSegment); Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty()); + + snapshot = segmentAnnouncer.getSegmentChangesSince( + snapshot.getCounter() + ).get(); + Assert.assertEquals(2, snapshot.getRequests().size()); + Assert.assertEquals(4, snapshot.getCounter().getCounter()); + + snapshot = segmentAnnouncer.getSegmentChangesSince( + new SegmentChangeRequestHistory.Counter(-1, -1) + ).get(); + Assert.assertEquals(0, snapshot.getRequests().size()); + Assert.assertEquals(4, snapshot.getCounter().getCounter()); } @Test @@ -272,6 +290,11 @@ public void testSingleAnnounceManyTimes() throws Exception @Test public void testBatchAnnounce() throws Exception + { + testBatchAnnounce(true); + } + + private void testBatchAnnounce(boolean testHistory) throws Exception { segmentAnnouncer.announceSegments(testSegments); @@ -285,16 +308,40 @@ public void testBatchAnnounce() throws Exception } Assert.assertEquals(allSegments, testSegments); + SegmentChangeRequestsSnapshot snapshot = null; + + if (testHistory) { + snapshot = segmentAnnouncer.getSegmentChangesSince( + new SegmentChangeRequestHistory.Counter(-1, -1) + ).get(); + Assert.assertEquals(testSegments.size(), snapshot.getRequests().size()); + Assert.assertEquals(testSegments.size(), snapshot.getCounter().getCounter()); + } + segmentAnnouncer.unannounceSegments(testSegments); Assert.assertTrue(cf.getChildren().forPath(testSegmentsPath).isEmpty()); + + if (testHistory) { + snapshot = segmentAnnouncer.getSegmentChangesSince( + snapshot.getCounter() + ).get(); + Assert.assertEquals(testSegments.size(), snapshot.getRequests().size()); + Assert.assertEquals(2 * testSegments.size(), snapshot.getCounter().getCounter()); + + snapshot = segmentAnnouncer.getSegmentChangesSince( + new SegmentChangeRequestHistory.Counter(-1, -1) + ).get(); + Assert.assertEquals(0, snapshot.getRequests().size()); + Assert.assertEquals(2 * testSegments.size(), snapshot.getCounter().getCounter()); + } } @Test public void testMultipleBatchAnnounce() throws Exception { for (int i = 0; i < 10; i++) { - testBatchAnnounce(); + testBatchAnnounce(false); } } diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index 18ca9f423d05..3f4a6b5ba5d3 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -36,7 +36,6 @@ import io.druid.concurrent.Execs; import io.druid.curator.CuratorTestBase; import io.druid.curator.discovery.NoopServiceAnnouncer; -import io.druid.curator.inventory.InventoryManagerConfig; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.concurrent.ScheduledExecutorFactory; import io.druid.metadata.MetadataRuleManager; @@ -238,22 +237,6 @@ public void testMoveSegment() throws Exception loadManagementPeons.put("from", loadQueuePeon); loadManagementPeons.put("to", loadQueuePeon); - EasyMock.expect(serverInventoryView.getInventoryManagerConfig()).andReturn( - new InventoryManagerConfig() - { - @Override - public String getContainerPath() - { - return ""; - } - - @Override - public String getInventoryPath() - { - return ""; - } - } - ); EasyMock.replay(serverInventoryView); coordinator.moveSegment( diff --git a/services/src/main/java/io/druid/cli/CliHistorical.java b/services/src/main/java/io/druid/cli/CliHistorical.java index 6446e2ec208f..dfd44ba902bf 100644 --- a/services/src/main/java/io/druid/cli/CliHistorical.java +++ b/services/src/main/java/io/druid/cli/CliHistorical.java @@ -37,6 +37,7 @@ import io.druid.java.util.common.logger.Logger; import io.druid.query.QuerySegmentWalker; import io.druid.query.lookup.LookupModule; +import io.druid.server.http.SegmentListerResource; import io.druid.server.metrics.QueryCountStatsProvider; import io.druid.server.QueryResource; import io.druid.server.coordination.ServerManager; @@ -86,6 +87,7 @@ public void configure(Binder binder) binder.bind(QueryCountStatsProvider.class).to(QueryResource.class).in(LazySingleton.class); Jerseys.addResource(binder, QueryResource.class); Jerseys.addResource(binder, HistoricalResource.class); + Jerseys.addResource(binder, SegmentListerResource.class); LifecycleModule.register(binder, QueryResource.class); LifecycleModule.register(binder, ZkCoordinator.class); diff --git a/services/src/main/java/io/druid/cli/CliPeon.java b/services/src/main/java/io/druid/cli/CliPeon.java index 530e224ae539..e41284e138de 100644 --- a/services/src/main/java/io/druid/cli/CliPeon.java +++ b/services/src/main/java/io/druid/cli/CliPeon.java @@ -84,6 +84,7 @@ import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig; import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; +import io.druid.server.http.SegmentListerResource; import io.druid.server.metrics.QueryCountStatsProvider; import io.druid.server.QueryResource; import io.druid.server.initialization.jetty.ChatHandlerServerModule; @@ -204,6 +205,7 @@ public void configure(Binder binder) binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class); binder.bind(QueryCountStatsProvider.class).to(QueryResource.class).in(LazySingleton.class); Jerseys.addResource(binder, QueryResource.class); + Jerseys.addResource(binder, SegmentListerResource.class); LifecycleModule.register(binder, QueryResource.class); binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(nodeType)); LifecycleModule.register(binder, Server.class); diff --git a/services/src/main/java/io/druid/cli/CliRealtimeExample.java b/services/src/main/java/io/druid/cli/CliRealtimeExample.java index 9d9122d8a435..0f072cfbf635 100644 --- a/services/src/main/java/io/druid/cli/CliRealtimeExample.java +++ b/services/src/main/java/io/druid/cli/CliRealtimeExample.java @@ -24,7 +24,6 @@ import com.google.inject.Inject; import com.google.inject.Module; import com.google.inject.name.Names; - import io.airlift.airline.Command; import io.druid.client.DruidServer; import io.druid.client.InventoryView; @@ -118,6 +117,18 @@ public Iterable getInventory() { return ImmutableList.of(); } + + @Override + public boolean isStarted() + { + return true; + } + + @Override + public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment) + { + return false; + } } private static class NoopDataSegmentPusher implements DataSegmentPusher @@ -168,11 +179,5 @@ public void unannounceSegments(Iterable segments) throws IOExceptio { // do nothing } - - @Override - public boolean isAnnounced(DataSegment segment) - { - return false; - } } } diff --git a/services/src/main/java/io/druid/guice/RealtimeModule.java b/services/src/main/java/io/druid/guice/RealtimeModule.java index aa0e357fc493..45861fb65dcc 100644 --- a/services/src/main/java/io/druid/guice/RealtimeModule.java +++ b/services/src/main/java/io/druid/guice/RealtimeModule.java @@ -39,9 +39,10 @@ import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig; import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; -import io.druid.server.metrics.QueryCountStatsProvider; import io.druid.server.QueryResource; +import io.druid.server.http.SegmentListerResource; import io.druid.server.initialization.jetty.JettyServerInitializer; +import io.druid.server.metrics.QueryCountStatsProvider; import org.eclipse.jetty.server.Server; import java.util.List; @@ -105,6 +106,7 @@ public void configure(Binder binder) binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class); binder.bind(QueryCountStatsProvider.class).to(QueryResource.class).in(LazySingleton.class); Jerseys.addResource(binder, QueryResource.class); + Jerseys.addResource(binder, SegmentListerResource.class); LifecycleModule.register(binder, QueryResource.class); LifecycleModule.register(binder, Server.class); }