From 343bdc94f04745d531bca073817f38da59b50efc Mon Sep 17 00:00:00 2001 From: Kevin Conaway Date: Fri, 8 Sep 2017 12:12:06 -0400 Subject: [PATCH 1/6] Refactor SegmentTracker out of AppenderatorDriver --- .../segment/realtime/SegmentTracker.java | 260 ++++++++++++++++++ ...adata.java => SegmentTrackerMetadata.java} | 9 +- .../appenderator/AppenderatorDriver.java | 213 ++------------ .../appenderator/AppenderatorDriverTest.java | 12 +- 4 files changed, 292 insertions(+), 202 deletions(-) create mode 100644 server/src/main/java/io/druid/segment/realtime/SegmentTracker.java rename server/src/main/java/io/druid/segment/realtime/{appenderator/AppenderatorDriverMetadata.java => SegmentTrackerMetadata.java} (92%) diff --git a/server/src/main/java/io/druid/segment/realtime/SegmentTracker.java b/server/src/main/java/io/druid/segment/realtime/SegmentTracker.java new file mode 100644 index 000000000000..56aca4f2fe6c --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/SegmentTracker.java @@ -0,0 +1,260 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import io.druid.data.input.InputRow; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import org.joda.time.DateTime; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** + * Allocates and tracks actively used segments. + */ +public class SegmentTracker +{ + private static final Logger log = new Logger(SegmentTracker.class); + + // All access to "activeSegments" and "lastSegmentId" must be synchronized on "activeSegments". + + // sequenceName -> start of segment interval -> segment we're currently adding data to + private final Map> activeSegments = new TreeMap<>(); + + // sequenceName -> list of identifiers of segments waiting for being published + // publishPendingSegments is always a super set of activeSegments because there can be some segments to which data + // are not added anymore, but not published yet. + private final Map> publishPendingSegments = new HashMap<>(); + + // sequenceName -> most recently allocated segment + private final Map lastSegmentIds = Maps.newHashMap(); + + private final SegmentAllocator segmentAllocator; + + public SegmentTracker( + final SegmentAllocator segmentAllocator + ) + { + this.segmentAllocator = segmentAllocator; + } + + @VisibleForTesting + public Map> getActiveSegments() + { + return activeSegments; + } + + @VisibleForTesting + public Map> getPublishPendingSegments() + { + return publishPendingSegments; + } + + public void clear() + { + synchronized (activeSegments) { + activeSegments.clear(); + } + } + + public List removePublished() + { + synchronized (activeSegments) { + return removePublished( + ImmutableList.copyOf(publishPendingSegments.keySet()) + ); + } + } + + public List removePublished(Collection sequenceNames) + { + synchronized (activeSegments) { + final List segments = sequenceNames.stream() + .map(publishPendingSegments::remove) + .filter(Objects::nonNull) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + sequenceNames.forEach(activeSegments::remove); + + return segments; + } + } + + public SegmentTrackerMetadata wrapMetadata(Object callerMetadata) + { + synchronized (activeSegments) { + return new SegmentTrackerMetadata( + ImmutableMap.copyOf( + Maps.transformValues( + activeSegments, + new Function, List>() + { + @Override + public List apply(NavigableMap input) + { + return ImmutableList.copyOf(input.values()); + } + } + ) + ), + ImmutableMap.copyOf(publishPendingSegments), + ImmutableMap.copyOf(lastSegmentIds), + callerMetadata + ); + } + } + + public void restoreFromMetadata(SegmentTrackerMetadata metadata) + { + synchronized (activeSegments) { + for (Map.Entry> entry : metadata.getActiveSegments().entrySet()) { + final String sequenceName = entry.getKey(); + final TreeMap segmentMap = Maps.newTreeMap(); + + activeSegments.put(sequenceName, segmentMap); + + for (SegmentIdentifier identifier : entry.getValue()) { + segmentMap.put(identifier.getInterval().getStartMillis(), identifier); + } + } + publishPendingSegments.putAll(metadata.getPublishPendingSegments()); + lastSegmentIds.putAll(metadata.getLastSegmentIds()); + } + } + + /** + * Return a segment usable for "timestamp". May return null if no segment can be allocated. + * + * @param row Input row + * @param sequenceName sequenceName for potential segment allocation + * + * @return identifier, or null + * + * @throws IOException if an exception occurs while allocating a segment + */ + public SegmentIdentifier getSegment(final InputRow row, final String sequenceName) throws IOException + { + synchronized (activeSegments) { + final DateTime timestamp = row.getTimestamp(); + final SegmentIdentifier existing = getActiveSegment(timestamp, sequenceName); + if (existing != null) { + return existing; + } else { + // Allocate new segment. + final SegmentIdentifier newSegment = segmentAllocator.allocate( + row, + sequenceName, + lastSegmentIds.get(sequenceName) + ); + + + if (newSegment != null) { + /* + for (SegmentIdentifier identifier : appenderator.getSegments()) { + if (identifier.equals(newSegment)) { + throw new ISE( + "WTF?! Allocated segment[%s] which conflicts with existing segment[%s].", + newSegment, + identifier + ); + } + } + */ + + log.info("New segment[%s] for sequenceName[%s].", newSegment, sequenceName); + addSegment(sequenceName, newSegment); + } else { + // Well, we tried. + log.warn("Cannot allocate segment for timestamp[%s], sequenceName[%s]. ", timestamp, sequenceName); + } + + return newSegment; + } + } + } + + /** + * Move a set of identifiers out from "active", making way for newer segments. + */ + public void moveSegmentOut(final String sequenceName, final List identifiers) + { + synchronized (activeSegments) { + final NavigableMap activeSegmentsForSequence = activeSegments.get(sequenceName); + if (activeSegmentsForSequence == null) { + throw new ISE("WTF?! Asked to remove segments for sequenceName[%s] which doesn't exist...", sequenceName); + } + + for (final SegmentIdentifier identifier : identifiers) { + log.info("Moving segment[%s] out of active list.", identifier); + final long key = identifier.getInterval().getStartMillis(); + if (!activeSegmentsForSequence.remove(key).equals(identifier)) { + throw new ISE("WTF?! Asked to remove segment[%s] that didn't exist...", identifier); + } + } + } + } + + private SegmentIdentifier getActiveSegment(final DateTime timestamp, final String sequenceName) + { + synchronized (activeSegments) { + final NavigableMap activeSegmentsForSequence = activeSegments.get(sequenceName); + + if (activeSegmentsForSequence == null) { + return null; + } + + final Map.Entry candidateEntry = activeSegmentsForSequence.floorEntry(timestamp.getMillis()); + if (candidateEntry != null && candidateEntry.getValue().getInterval().contains(timestamp)) { + return candidateEntry.getValue(); + } else { + return null; + } + } + } + + private void addSegment(String sequenceName, SegmentIdentifier identifier) + { + synchronized (activeSegments) { + activeSegments.computeIfAbsent(sequenceName, k -> new TreeMap<>()) + .putIfAbsent(identifier.getInterval().getStartMillis(), identifier); + + publishPendingSegments.computeIfAbsent(sequenceName, k -> new ArrayList<>()) + .add(identifier); + lastSegmentIds.put(sequenceName, identifier.getIdentifierAsString()); + } + } + +} diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java b/server/src/main/java/io/druid/segment/realtime/SegmentTrackerMetadata.java similarity index 92% rename from server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java rename to server/src/main/java/io/druid/segment/realtime/SegmentTrackerMetadata.java index edff72572d9e..b22210ffc00a 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java +++ b/server/src/main/java/io/druid/segment/realtime/SegmentTrackerMetadata.java @@ -17,15 +17,16 @@ * under the License. */ -package io.druid.segment.realtime.appenderator; +package io.druid.segment.realtime; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; import java.util.List; import java.util.Map; -public class AppenderatorDriverMetadata +public class SegmentTrackerMetadata { private final Map> activeSegments; private final Map> publishPendingSegments; @@ -33,7 +34,7 @@ public class AppenderatorDriverMetadata private final Object callerMetadata; @JsonCreator - public AppenderatorDriverMetadata( + public SegmentTrackerMetadata( @JsonProperty("activeSegments") Map> activeSegments, @JsonProperty("publishPendingSegments") Map> publishPendingSegments, @JsonProperty("lastSegmentIds") Map lastSegmentIds, @@ -73,7 +74,7 @@ public Object getCallerMetadata() @Override public String toString() { - return "AppenderatorDriverMetadata{" + + return "SegmentTrackerMetadata{" + "activeSegments=" + activeSegments + ", publishPendingSegments=" + publishPendingSegments + ", lastSegmentIds=" + lastSegmentIds + diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java index f003c01c5bf6..8f9fee9d29eb 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java @@ -26,10 +26,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -44,21 +41,16 @@ import io.druid.java.util.common.logger.Logger; import io.druid.query.SegmentDescriptor; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.SegmentTracker; +import io.druid.segment.realtime.SegmentTrackerMetadata; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; -import org.joda.time.DateTime; import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Objects; import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -78,26 +70,11 @@ public class AppenderatorDriver implements Closeable private static final Logger log = new Logger(AppenderatorDriver.class); private final Appenderator appenderator; - private final SegmentAllocator segmentAllocator; + private final SegmentTracker segmentTracker; private final SegmentHandoffNotifier handoffNotifier; private final UsedSegmentChecker usedSegmentChecker; private final ObjectMapper objectMapper; private final FireDepartmentMetrics metrics; - - // All access to "activeSegments", "publishPendingSegments", and "lastSegmentId" must be synchronized on - // "activeSegments". - - // sequenceName -> start of segment interval -> segment we're currently adding data to - private final Map> activeSegments = new TreeMap<>(); - - // sequenceName -> list of identifiers of segments waiting for being published - // publishPendingSegments is always a super set of activeSegments because there can be some segments to which data - // are not added anymore, but not published yet. - private final Map> publishPendingSegments = new HashMap<>(); - - // sequenceName -> most recently allocated segment - private final Map lastSegmentIds = Maps.newHashMap(); - private final ListeningExecutorService publishExecutor; /** @@ -120,7 +97,9 @@ public AppenderatorDriver( ) { this.appenderator = Preconditions.checkNotNull(appenderator, "appenderator"); - this.segmentAllocator = Preconditions.checkNotNull(segmentAllocator, "segmentAllocator"); + this.segmentTracker = new SegmentTracker( + Preconditions.checkNotNull(segmentAllocator, "segmentAllocator") + ); this.handoffNotifier = Preconditions.checkNotNull(handoffNotifierFactory, "handoffNotifierFactory") .createSegmentHandoffNotifier(appenderator.getDataSource()); this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker"); @@ -130,15 +109,8 @@ public AppenderatorDriver( } @VisibleForTesting - Map> getActiveSegments() - { - return activeSegments; - } - - @VisibleForTesting - Map> getPublishPendingSegments() - { - return publishPendingSegments; + SegmentTracker getSegmentTracker() { + return segmentTracker; } /** @@ -153,28 +125,15 @@ public Object startJob() { handoffNotifier.start(); - final AppenderatorDriverMetadata metadata = objectMapper.convertValue( + final SegmentTrackerMetadata metadata = objectMapper.convertValue( appenderator.startJob(), - AppenderatorDriverMetadata.class + SegmentTrackerMetadata.class ); log.info("Restored metadata[%s].", metadata); if (metadata != null) { - synchronized (activeSegments) { - for (Map.Entry> entry : metadata.getActiveSegments().entrySet()) { - final String sequenceName = entry.getKey(); - final TreeMap segmentMap = Maps.newTreeMap(); - - activeSegments.put(sequenceName, segmentMap); - - for (SegmentIdentifier identifier : entry.getValue()) { - segmentMap.put(identifier.getInterval().getStartMillis(), identifier); - } - } - publishPendingSegments.putAll(metadata.getPublishPendingSegments()); - lastSegmentIds.putAll(metadata.getLastSegmentIds()); - } + segmentTracker.restoreFromMetadata(metadata); return metadata.getCallerMetadata(); } else { @@ -182,26 +141,12 @@ public Object startJob() } } - private void addSegment(String sequenceName, SegmentIdentifier identifier) - { - synchronized (activeSegments) { - activeSegments.computeIfAbsent(sequenceName, k -> new TreeMap<>()) - .putIfAbsent(identifier.getInterval().getStartMillis(), identifier); - - publishPendingSegments.computeIfAbsent(sequenceName, k -> new ArrayList<>()) - .add(identifier); - lastSegmentIds.put(sequenceName, identifier.getIdentifierAsString()); - } - } - /** * Clears out all our state and also calls {@link Appenderator#clear()} on the underlying Appenderator. */ public void clear() throws InterruptedException { - synchronized (activeSegments) { - activeSegments.clear(); - } + segmentTracker.clear(); appenderator.clear(); } @@ -226,7 +171,7 @@ public AppenderatorDriverAddResult add( Preconditions.checkNotNull(sequenceName, "sequenceName"); Preconditions.checkNotNull(committerSupplier, "committerSupplier"); - final SegmentIdentifier identifier = getSegment(row, sequenceName); + final SegmentIdentifier identifier = segmentTracker.getSegment(row, sequenceName); if (identifier != null) { try { @@ -276,7 +221,7 @@ public Object persist(final Committer committer) throws InterruptedException * * @return null if the input segmentsAndMetadata is null. Otherwise, a {@link ListenableFuture} for the submitted task * which returns {@link SegmentsAndMetadata} containing the segments successfully handed off and the metadata - * of the caller of {@link AppenderatorDriverMetadata} + * of the caller of {@link SegmentTrackerMetadata} */ public ListenableFuture registerHandoff(SegmentsAndMetadata segmentsAndMetadata) { @@ -292,7 +237,7 @@ public ListenableFuture registerHandoff(SegmentsAndMetadata return Futures.immediateFuture( new SegmentsAndMetadata( segmentsAndMetadata.getSegments(), - ((AppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()) + ((SegmentTrackerMetadata) segmentsAndMetadata.getCommitMetadata()) .getCallerMetadata() ) ); @@ -328,7 +273,7 @@ public void onSuccess(Object result) resultFuture.set( new SegmentsAndMetadata( segmentsAndMetadata.getSegments(), - ((AppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()) + ((SegmentTrackerMetadata) segmentsAndMetadata.getCommitMetadata()) .getCallerMetadata() ) ); @@ -362,91 +307,12 @@ public void close() handoffNotifier.close(); } - private SegmentIdentifier getActiveSegment(final DateTime timestamp, final String sequenceName) - { - synchronized (activeSegments) { - final NavigableMap activeSegmentsForSequence = activeSegments.get(sequenceName); - - if (activeSegmentsForSequence == null) { - return null; - } - - final Map.Entry candidateEntry = activeSegmentsForSequence.floorEntry(timestamp.getMillis()); - if (candidateEntry != null && candidateEntry.getValue().getInterval().contains(timestamp)) { - return candidateEntry.getValue(); - } else { - return null; - } - } - } - - /** - * Return a segment usable for "timestamp". May return null if no segment can be allocated. - * - * @param row input row - * @param sequenceName sequenceName for potential segment allocation - * - * @return identifier, or null - * - * @throws IOException if an exception occurs while allocating a segment - */ - private SegmentIdentifier getSegment(final InputRow row, final String sequenceName) throws IOException - { - synchronized (activeSegments) { - final DateTime timestamp = row.getTimestamp(); - final SegmentIdentifier existing = getActiveSegment(timestamp, sequenceName); - if (existing != null) { - return existing; - } else { - // Allocate new segment. - final SegmentIdentifier newSegment = segmentAllocator.allocate( - row, - sequenceName, - lastSegmentIds.get(sequenceName) - ); - - if (newSegment != null) { - for (SegmentIdentifier identifier : appenderator.getSegments()) { - if (identifier.equals(newSegment)) { - throw new ISE( - "WTF?! Allocated segment[%s] which conflicts with existing segment[%s].", - newSegment, - identifier - ); - } - } - - log.info("New segment[%s] for sequenceName[%s].", newSegment, sequenceName); - addSegment(sequenceName, newSegment); - } else { - // Well, we tried. - log.warn("Cannot allocate segment for timestamp[%s], sequenceName[%s]. ", timestamp, sequenceName); - } - - return newSegment; - } - } - } - /** * Move a set of identifiers out from "active", making way for newer segments. */ public void moveSegmentOut(final String sequenceName, final List identifiers) { - synchronized (activeSegments) { - final NavigableMap activeSegmentsForSequence = activeSegments.get(sequenceName); - if (activeSegmentsForSequence == null) { - throw new ISE("WTF?! Asked to remove segments for sequenceName[%s] which doesn't exist...", sequenceName); - } - - for (final SegmentIdentifier identifier : identifiers) { - log.info("Moving segment[%s] out of active list.", identifier); - final long key = identifier.getInterval().getStartMillis(); - if (!activeSegmentsForSequence.remove(key).equals(identifier)) { - throw new ISE("WTF?! Asked to remove segment[%s] that didn't exist...", identifier); - } - } - } + segmentTracker.moveSegmentOut(sequenceName, identifiers); } /** @@ -463,16 +329,7 @@ public ListenableFuture publishAll( final Committer committer ) { - final List theSegments; - synchronized (activeSegments) { - final List sequenceNames = ImmutableList.copyOf(publishPendingSegments.keySet()); - theSegments = sequenceNames.stream() - .map(publishPendingSegments::remove) - .filter(Objects::nonNull) - .flatMap(Collection::stream) - .collect(Collectors.toList()); - sequenceNames.forEach(activeSegments::remove); - } + final List theSegments = segmentTracker.removePublished(); return publish(publisher, wrapCommitter(committer), theSegments); } @@ -493,15 +350,7 @@ public ListenableFuture publish( final Collection sequenceNames ) { - final List theSegments; - synchronized (activeSegments) { - theSegments = sequenceNames.stream() - .map(publishPendingSegments::remove) - .filter(Objects::nonNull) - .flatMap(Collection::stream) - .collect(Collectors.toList()); - sequenceNames.forEach(activeSegments::remove); - } + final List theSegments = segmentTracker.removePublished(sequenceNames); return publish(publisher, wrapCommitter(committer), theSegments); } @@ -553,7 +402,7 @@ private ListenableFuture publish( try { final boolean published = publisher.publishSegments( ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), - ((AppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() + ((SegmentTrackerMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() ); if (published) { @@ -603,27 +452,7 @@ private Supplier wrapCommitterSupplier(final Supplier comm private WrappedCommitter wrapCommitter(final Committer committer) { - final AppenderatorDriverMetadata wrappedMetadata; - synchronized (activeSegments) { - wrappedMetadata = new AppenderatorDriverMetadata( - ImmutableMap.copyOf( - Maps.transformValues( - activeSegments, - new Function, List>() - { - @Override - public List apply(NavigableMap input) - { - return ImmutableList.copyOf(input.values()); - } - } - ) - ), - ImmutableMap.copyOf(publishPendingSegments), - ImmutableMap.copyOf(lastSegmentIds), - committer.getMetadata() - ); - } + final SegmentTrackerMetadata wrappedMetadata = segmentTracker.wrapMetadata(committer.getMetadata()); return new WrappedCommitter() { diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java index d6430b6acff6..774697fb47ec 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java @@ -139,8 +139,8 @@ public void testSimple() throws Exception committerSupplier.get(), ImmutableList.of("dummy") ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); - Assert.assertFalse(driver.getActiveSegments().containsKey("dummy")); - Assert.assertFalse(driver.getPublishPendingSegments().containsKey("dummy")); + Assert.assertFalse(driver.getSegmentTracker().getActiveSegments().containsKey("dummy")); + Assert.assertFalse(driver.getSegmentTracker().getPublishPendingSegments().containsKey("dummy")); final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published) .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); @@ -186,8 +186,8 @@ public void testMaxRowsPerSegment() throws Exception committerSupplier.get(), ImmutableList.of("dummy") ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); - Assert.assertFalse(driver.getActiveSegments().containsKey("dummy")); - Assert.assertFalse(driver.getPublishPendingSegments().containsKey("dummy")); + Assert.assertFalse(driver.getSegmentTracker().getActiveSegments().containsKey("dummy")); + Assert.assertFalse(driver.getSegmentTracker().getPublishPendingSegments().containsKey("dummy")); final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published) .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); Assert.assertEquals(numSegments, segmentsAndMetadata.getSegments().size()); @@ -212,8 +212,8 @@ public void testHandoffTimeout() throws Exception committerSupplier.get(), ImmutableList.of("dummy") ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); - Assert.assertFalse(driver.getActiveSegments().containsKey("dummy")); - Assert.assertFalse(driver.getPublishPendingSegments().containsKey("dummy")); + Assert.assertFalse(driver.getSegmentTracker().getActiveSegments().containsKey("dummy")); + Assert.assertFalse(driver.getSegmentTracker().getPublishPendingSegments().containsKey("dummy")); driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); } From c9a15ba0158bec50eb955e6ba3ef1b4cfdefafd4 Mon Sep 17 00:00:00 2001 From: Kevin Conaway Date: Sun, 10 Sep 2017 13:43:27 -0400 Subject: [PATCH 2/6] Refactor RealtimePlumber to use SegmentAllocate API --- .../common/index/YeOldePlumberSchool.java | 4 +- .../common/task/RealtimeIndexTask.java | 44 ++-- .../indexing/common/TestRealtimeTask.java | 3 +- .../common/task/RealtimeIndexTaskTest.java | 16 +- .../indexing/common/task/TaskSerdeTest.java | 3 +- .../indexing/overlord/TaskLifecycleTest.java | 16 +- .../indexing/worker/TaskAnnouncementTest.java | 3 +- .../indexing/RealtimeTuningConfig.java | 21 -- .../segment/realtime/FireDepartment.java | 5 +- .../segment/realtime/RealtimeManager.java | 11 +- .../segment/realtime/SegmentTracker.java | 16 +- .../appenderator/AppenderatorDriver.java | 3 +- .../appenderator/AppenderatorImpl.java | 2 +- .../appenderator/AppenderatorPlumber.java | 2 +- .../AppenderatorPlumberSchool.java | 1 + .../SpecBasedSegmentAllocator.java | 67 ++++++ .../realtime/plumber/FlushingPlumber.java | 25 ++- .../plumber/FlushingPlumberSchool.java | 5 +- .../segment/realtime/plumber/Plumber.java | 3 +- .../realtime/plumber/PlumberSchool.java | 8 +- .../segment/realtime/plumber/Plumbers.java | 3 +- .../realtime/plumber/RealtimePlumber.java | 195 +++++++++++------- .../plumber/RealtimePlumberSchool.java | 5 +- .../segment/realtime/RealtimeManagerTest.java | 13 +- .../AppenderatorDriverFailTest.java | 3 +- .../appenderator/AppenderatorDriverTest.java | 43 +--- .../appenderator/AppenderatorPlumberTest.java | 6 +- .../SpecBasedSegmentAllocatorTest.java | 94 +++++++++ .../appenderator/TestSegmentAllocator.java | 68 ++++++ .../plumber/RealtimePlumberSchoolTest.java | 104 ++++++++-- .../cli/validate/DruidJsonValidatorTest.java | 3 +- 31 files changed, 556 insertions(+), 239 deletions(-) create mode 100644 server/src/main/java/io/druid/segment/realtime/appenderator/SpecBasedSegmentAllocator.java create mode 100644 server/src/test/java/io/druid/segment/realtime/appenderator/SpecBasedSegmentAllocatorTest.java create mode 100644 server/src/test/java/io/druid/segment/realtime/appenderator/TestSegmentAllocator.java diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index 808cda53b948..d9dcef85bea1 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -46,6 +46,7 @@ import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireHydrant; +import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.segment.realtime.plumber.Sink; @@ -93,6 +94,7 @@ public YeOldePlumberSchool( @Override public Plumber findPlumber( + final SegmentAllocator segmentAllocator, final DataSchema schema, final RealtimeTuningConfig config, final FireDepartmentMetrics metrics @@ -123,7 +125,7 @@ public Object startJob() } @Override - public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException + public int add(InputRow row, String sequenceName, Supplier committerSupplier) throws IndexSizeExceededException { Sink sink = getSink(row.getTimestampFromEpoch()); if (sink == null) { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index fe2a92f2b9db..ec7ec1effd8f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -34,7 +34,7 @@ import io.druid.discovery.DiscoveryDruidNode; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.discovery.LookupNodeService; -import io.druid.indexing.common.TaskLock; +import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.LockAcquireAction; @@ -57,6 +57,7 @@ import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; @@ -65,12 +66,10 @@ import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.segment.realtime.plumber.Plumbers; import io.druid.segment.realtime.plumber.RealtimePlumberSchool; -import io.druid.segment.realtime.plumber.VersioningPolicy; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; -import org.joda.time.Interval; import java.io.File; import java.io.IOException; @@ -261,35 +260,10 @@ public void unannounceSegments(Iterable segments) throws IOExceptio } }; - // NOTE: getVersion will block if there is lock contention, which will block plumber.getSink - // NOTE: (and thus the firehose) - - // Shouldn't usually happen, since we don't expect people to submit tasks that intersect with the - // realtime window, but if they do it can be problematic. If we decide to care, we can use more threads in - // the plumber such that waiting for the coordinator doesn't block data processing. - final VersioningPolicy versioningPolicy = new VersioningPolicy() - { - @Override - public String getVersion(final Interval interval) - { - try { - // Side effect: Calling getVersion causes a lock to be acquired - final TaskLock myLock = toolbox.getTaskActionClient() - .submit(new LockAcquireAction(interval, lockTimeoutMs)); - - return myLock.getVersion(); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } - }; - DataSchema dataSchema = spec.getDataSchema(); RealtimeIOConfig realtimeIOConfig = spec.getIOConfig(); RealtimeTuningConfig tuningConfig = spec.getTuningConfig() - .withBasePersistDirectory(toolbox.getPersistDir()) - .withVersioningPolicy(versioningPolicy); + .withBasePersistDirectory(toolbox.getPersistDir()); final FireDepartment fireDepartment = new FireDepartment( dataSchema, @@ -305,6 +279,8 @@ public String getVersion(final Interval interval) ); this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate(); + final SegmentAllocator segmentAllocator = createSegmentAllocator(toolbox); + // NOTE: This pusher selects path based purely on global configuration and the DataSegment, which means // NOTE: that redundant realtime tasks will upload to the same location. This can cause index.zip // NOTE: (partitionNum_index.zip for HDFS data storage) and descriptor.json (partitionNum_descriptor.json for @@ -325,7 +301,7 @@ public String getVersion(final Interval interval) toolbox.getObjectMapper() ); - this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, metrics); + this.plumber = plumberSchool.findPlumber(segmentAllocator, dataSchema, tuningConfig, metrics); Supplier committerSupplier = null; final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); @@ -367,10 +343,13 @@ public String getVersion(final Interval interval) } } + final String sequenceName = getId(); + // Time to read data! while (firehose != null && (!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) { Plumbers.addNextRow( committerSupplier, + sequenceName, firehose, plumber, tuningConfig.isReportParseExceptions(), @@ -532,6 +511,11 @@ && isFirehoseDrainableByClosing(((TimedShutoffFirehoseFactory) firehoseFactory). && isFirehoseDrainableByClosing(((ClippedFirehoseFactory) firehoseFactory).getDelegate())); } + protected SegmentAllocator createSegmentAllocator(TaskToolbox toolbox) + { + return new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), spec.getDataSchema()); + } + public static class TaskActionSegmentPublisher implements SegmentPublisher { final Task task; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java index 77a0f0fa93e2..688a64523628 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java @@ -32,6 +32,7 @@ import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; @@ -63,7 +64,7 @@ public TestRealtimeTask( { @Override public Plumber findPlumber( - DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + SegmentAllocator segmentAllocator, DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics ) { return null; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index c40d8e30ec9c..4c44cb599d25 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -33,7 +33,7 @@ import com.google.common.util.concurrent.MoreExecutors; //CHECKSTYLE.OFF: Regexp import com.metamx.common.logger.Logger; -//CHECKSTYLE.ON: Regexp + import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.LoggingEmitter; import com.metamx.emitter.service.ServiceEmitter; @@ -100,11 +100,14 @@ import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.realtime.FireDepartment; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.TestSegmentAllocator; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory; @@ -115,6 +118,7 @@ import org.easymock.EasyMock; import org.hamcrest.CoreMatchers; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; @@ -874,11 +878,12 @@ private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportPa private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportParseExceptions, long handoffTimeout) { ObjectMapper objectMapper = new DefaultObjectMapper(); + GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null); DataSchema dataSchema = new DataSchema( "test_ds", null, new AggregatorFactory[]{new CountAggregatorFactory("rows"), new LongSumAggregatorFactory("met1", "met1")}, - new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), + granularitySpec, objectMapper ); RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig( @@ -915,6 +920,13 @@ protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) { return true; } + + @Override + protected SegmentAllocator createSegmentAllocator(TaskToolbox toolbox) + { + String version = DateTime.now(DateTimeZone.UTC).toString(); + return new TestSegmentAllocator(dataSchema.getDataSource(), version, granularitySpec.getSegmentGranularity()); + } }; } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 02dfdbbd050a..85d3bd9b048c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -48,6 +48,7 @@ import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; @@ -476,7 +477,7 @@ public void testRealtimeIndexTaskSerde() throws Exception { @Override public Plumber findPlumber( - DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + SegmentAllocator segmentAllocator, DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics ) { return null; diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 8d38b8abc10c..763ac70c52be 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -95,6 +95,7 @@ import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentMover; @@ -107,6 +108,8 @@ import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartmentTest; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.TestSegmentAllocator; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.DruidNode; @@ -119,6 +122,7 @@ import io.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.joda.time.Hours; import org.joda.time.Interval; import org.joda.time.Period; @@ -1182,11 +1186,12 @@ private TaskStatus runTask(final Task task) throws Exception private RealtimeIndexTask newRealtimeIndexTask() { String taskId = StringUtils.format("rt_task_%s", System.currentTimeMillis()); + GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null); DataSchema dataSchema = new DataSchema( "test_ds", null, new AggregatorFactory[]{new LongSumAggregatorFactory("count", "rows")}, - new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), + granularitySpec, mapper ); RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig( @@ -1218,6 +1223,13 @@ private RealtimeIndexTask newRealtimeIndexTask() new TaskResource(taskId, 1), fireDepartment, null - ); + ) { + @Override + protected SegmentAllocator createSegmentAllocator(TaskToolbox toolbox) + { + String version = DateTime.now(DateTimeZone.UTC).toString(); + return new TestSegmentAllocator(dataSchema.getDataSource(), version, granularitySpec.getSegmentGranularity()); + } + }; } } diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java index 1d160e99bff6..b0c9698ea826 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java @@ -33,6 +33,7 @@ import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; @@ -64,7 +65,7 @@ public void testBackwardsCompatibleSerde() throws Exception { @Override public Plumber findPlumber( - DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + SegmentAllocator segmentAllocator, DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics ) { return null; diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index 5338055a4d8b..795044ced3bc 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -240,27 +240,6 @@ public long getAlertTimeout() return alertTimeout; } - public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) - { - return new RealtimeTuningConfig( - maxRowsInMemory, - intermediatePersistPeriod, - windowPeriod, - basePersistDirectory, - policy, - rejectionPolicyFactory, - maxPendingPersists, - shardSpec, - indexSpec, - true, - persistThreadPriority, - mergeThreadPriority, - reportParseExceptions, - handoffConditionTimeout, - alertTimeout - ); - } - public RealtimeTuningConfig withBasePersistDirectory(File dir) { return new RealtimeTuningConfig( diff --git a/server/src/main/java/io/druid/segment/realtime/FireDepartment.java b/server/src/main/java/io/druid/segment/realtime/FireDepartment.java index eca78928eccf..fb2c35ff8903 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireDepartment.java +++ b/server/src/main/java/io/druid/segment/realtime/FireDepartment.java @@ -28,6 +28,8 @@ import io.druid.segment.indexing.IngestionSpec; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SpecBasedSegmentAllocator; import io.druid.segment.realtime.plumber.Plumber; import java.io.IOException; @@ -91,7 +93,8 @@ public RealtimeTuningConfig getTuningConfig() public Plumber findPlumber() { - return ioConfig.getPlumberSchool().findPlumber(dataSchema, tuningConfig, metrics); + final SegmentAllocator segmentAllocator = new SpecBasedSegmentAllocator(dataSchema, tuningConfig); + return ioConfig.getPlumberSchool().findPlumber(segmentAllocator, dataSchema, tuningConfig, metrics); } public boolean checkFirehoseV2() diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index fd30f61b29b5..70d1dbbb874e 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -338,6 +338,7 @@ private boolean runFirehoseV2(FirehoseV2 firehose) throws Exception log.info("FirehoseV2 started"); final Supplier committerSupplier = Committers.supplierFromFirehoseV2(firehose); + final String sequenceName = generateSequenceName(); boolean haveRow = true; while (haveRow) { if (Thread.interrupted() || stopping) { @@ -348,7 +349,7 @@ private boolean runFirehoseV2(FirehoseV2 firehose) throws Exception try { inputRow = firehose.currRow(); if (inputRow != null) { - numRows = plumber.add(inputRow, committerSupplier); + numRows = plumber.add(inputRow, sequenceName, committerSupplier); if (numRows < 0) { metrics.incrementThrownAway(); log.debug("Throwing away event[%s]", inputRow); @@ -379,15 +380,21 @@ private boolean runFirehoseV2(FirehoseV2 firehose) throws Exception private boolean runFirehose(Firehose firehose) { final Supplier committerSupplier = Committers.supplierFromFirehose(firehose); + final String sequenceName = generateSequenceName(); while (firehose.hasMore()) { if (Thread.interrupted() || stopping) { return false; } - Plumbers.addNextRow(committerSupplier, firehose, plumber, config.isReportParseExceptions(), metrics); + Plumbers.addNextRow(committerSupplier, sequenceName, firehose, plumber, config.isReportParseExceptions(), metrics); } return true; } + private String generateSequenceName() + { + return "index_" + fireDepartment.getDataSchema().getDataSource() + "_" + config.getShardSpec().getPartitionNum(); + } + public QueryRunner getQueryRunner(Query query) { QueryRunnerFactory> factory = conglomerate.findFactory(query); diff --git a/server/src/main/java/io/druid/segment/realtime/SegmentTracker.java b/server/src/main/java/io/druid/segment/realtime/SegmentTracker.java index 56aca4f2fe6c..3494eed093a7 100644 --- a/server/src/main/java/io/druid/segment/realtime/SegmentTracker.java +++ b/server/src/main/java/io/druid/segment/realtime/SegmentTracker.java @@ -64,9 +64,7 @@ public class SegmentTracker private final SegmentAllocator segmentAllocator; - public SegmentTracker( - final SegmentAllocator segmentAllocator - ) + public SegmentTracker(SegmentAllocator segmentAllocator) { this.segmentAllocator = segmentAllocator; } @@ -182,18 +180,6 @@ public SegmentIdentifier getSegment(final InputRow row, final String sequenceNam if (newSegment != null) { - /* - for (SegmentIdentifier identifier : appenderator.getSegments()) { - if (identifier.equals(newSegment)) { - throw new ISE( - "WTF?! Allocated segment[%s] which conflicts with existing segment[%s].", - newSegment, - identifier - ); - } - } - */ - log.info("New segment[%s] for sequenceName[%s].", newSegment, sequenceName); addSegment(sequenceName, newSegment); } else { diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java index 8f9fee9d29eb..bd5d3d150b0d 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java @@ -109,7 +109,8 @@ public AppenderatorDriver( } @VisibleForTesting - SegmentTracker getSegmentTracker() { + SegmentTracker getSegmentTracker() + { return segmentTracker; } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index fe3c4886f5fa..48af43206530 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -762,7 +762,7 @@ private Object bootstrapSinksFromDisk() try { final SegmentIdentifier identifier = objectMapper.readValue( - new File(sinkDir, "identifier.json"), + new File(sinkDir, IDENTIFIER_FILE_NAME), SegmentIdentifier.class ); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java index c3e8678ba304..81fa87d0b23d 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java @@ -145,7 +145,7 @@ public Object startJob() } @Override - public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException + public int add(InputRow row, String sequenceName, Supplier committerSupplier) throws IndexSizeExceededException { final SegmentIdentifier identifier = getSegmentIdentifier(row.getTimestampFromEpoch()); if (identifier == null) { diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberSchool.java index 66ddd9d4747c..9fdd8de777db 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberSchool.java @@ -55,6 +55,7 @@ public AppenderatorPlumberSchool( @Override public Plumber findPlumber( + final SegmentAllocator segmentAllocator, final DataSchema schema, final RealtimeTuningConfig config, final FireDepartmentMetrics metrics diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SpecBasedSegmentAllocator.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SpecBasedSegmentAllocator.java new file mode 100644 index 000000000000..69f9eeac99a8 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SpecBasedSegmentAllocator.java @@ -0,0 +1,67 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import io.druid.data.input.InputRow; +import io.druid.java.util.common.granularity.Granularity; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.realtime.plumber.VersioningPolicy; +import io.druid.timeline.partition.ShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.io.IOException; + +/** + * Creates segments based on the realtime configuration spec + */ +public class SpecBasedSegmentAllocator implements SegmentAllocator +{ + private final String dataSource; + private final VersioningPolicy versioningPolicy; + private final Granularity segmentGranularity; + private final ShardSpec shardSpec; + + public SpecBasedSegmentAllocator(DataSchema schema, RealtimeTuningConfig tuningConfig) + { + this.dataSource = schema.getDataSource(); + this.versioningPolicy = tuningConfig.getVersioningPolicy(); + this.segmentGranularity = schema.getGranularitySpec().getSegmentGranularity(); + this.shardSpec = tuningConfig.getShardSpec(); + } + + @Override + public SegmentIdentifier allocate( + InputRow row, String sequenceName, String previousSegmentId + ) throws IOException + { + final DateTime intervalStart = segmentGranularity.bucketStart(row.getTimestamp()); + + final Interval interval = new Interval( + intervalStart, + segmentGranularity.increment(intervalStart) + ); + + String version = versioningPolicy.getVersion(interval); + + return new SegmentIdentifier(dataSource, interval, version, shardSpec); + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java index 3938d017a923..6bdd21cd5b97 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java @@ -37,6 +37,8 @@ import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.DateTime; import org.joda.time.Duration; @@ -73,8 +75,8 @@ public FlushingPlumber( IndexIO indexIO, Cache cache, CacheConfig cacheConfig, - ObjectMapper objectMapper - + ObjectMapper objectMapper, + SegmentAllocator segmentAllocator ) { super( @@ -92,7 +94,8 @@ public FlushingPlumber( indexIO, cache, cacheConfig, - objectMapper + objectMapper, + segmentAllocator ); this.flushDuration = flushDuration; @@ -117,7 +120,7 @@ public Object startJob() return retVal; } - protected void flushAfterDuration(final long truncatedTime, final Sink sink) + protected void flushAfterDuration(final SegmentIdentifier identifier, final Sink sink) { log.info( "Abandoning segment %s at %s", @@ -134,7 +137,7 @@ protected void flushAfterDuration(final long truncatedTime, final Sink sink) public ScheduledExecutors.Signal call() throws Exception { log.info("Abandoning segment %s", sink.getSegment().getIdentifier()); - abandonSegment(truncatedTime, sink); + abandonSegment(identifier, sink); return ScheduledExecutors.Signal.STOP; } } @@ -185,16 +188,16 @@ public ScheduledExecutors.Signal doCall() getRejectionPolicy().getCurrMaxTime().minus(windowMillis) ).getMillis(); - List> sinksToPush = Lists.newArrayList(); - for (Map.Entry entry : getSinks().entrySet()) { - final Long intervalStart = entry.getKey(); - if (intervalStart < minTimestamp) { + List> sinksToPush = Lists.newArrayList(); + for (Map.Entry entry : getSinks().entrySet()) { + DateTime intervalStart = entry.getKey().getInterval().getStart(); + if (intervalStart.isBefore(minTimestamp)) { log.info("Adding entry[%s] to flush.", entry); sinksToPush.add(entry); } } - for (final Map.Entry entry : sinksToPush) { + for (final Map.Entry entry : sinksToPush) { flushAfterDuration(entry.getKey(), entry.getValue()); } @@ -214,7 +217,7 @@ public void finishJob() { log.info("Stopping job"); - for (final Map.Entry entry : getSinks().entrySet()) { + for (final Map.Entry entry : getSinks().entrySet()) { abandonSegment(entry.getKey(), entry.getValue()); } shutdownExecutors(); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java index 11416650ff9a..a9b5a699c33b 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java @@ -34,6 +34,7 @@ import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.Duration; @@ -102,6 +103,7 @@ public FlushingPlumberSchool( @Override public Plumber findPlumber( + final SegmentAllocator segmentAllocator, final DataSchema schema, final RealtimeTuningConfig config, final FireDepartmentMetrics metrics @@ -122,7 +124,8 @@ public Plumber findPlumber( indexIO, cache, cacheConfig, - objectMapper + objectMapper, + segmentAllocator ); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java index d9ff2342d3b8..916b5503c4fe 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java @@ -38,12 +38,13 @@ public interface Plumber /** * @param row the row to insert + * @param sequenceName sequenceName for this row's segment * @param committerSupplier supplier of a committer associated with all data that has been added, including this row * * @return - positive numbers indicate how many summarized rows exist in the index for that timestamp, * -1 means a row was thrown away because it was too late */ - int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException; + int add(InputRow row, String sequenceName, Supplier committerSupplier) throws IndexSizeExceededException; QueryRunner getQueryRunner(Query query); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/PlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/PlumberSchool.java index 8c49822b60d5..02e25aae3a0c 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/PlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/PlumberSchool.java @@ -24,6 +24,7 @@ import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.appenderator.SegmentAllocator; /** */ @@ -39,6 +40,11 @@ public interface PlumberSchool * * @return returns a plumber */ - public Plumber findPlumber(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics); + Plumber findPlumber( + SegmentAllocator segmentAllocator, + DataSchema schema, + RealtimeTuningConfig config, + FireDepartmentMetrics metrics + ); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java b/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java index d1053ea2a1da..23499eeb4828 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java @@ -41,6 +41,7 @@ private Plumbers() public static void addNextRow( final Supplier committerSupplier, + final String sequenceName, final Firehose firehose, final Plumber plumber, final boolean reportParseExceptions, @@ -73,7 +74,7 @@ public static void addNextRow( final int numRows; try { - numRows = plumber.add(inputRow, committerSupplier); + numRows = plumber.add(inputRow, sequenceName, committerSupplier); } catch (IndexSizeExceededException e) { // Shouldn't happen if this is only being called by a single thread. diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 3fe234db39ae..ede022b376a8 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -46,6 +46,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.Pair; +import io.druid.java.util.common.RE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.concurrent.ScheduledExecutors; import io.druid.java.util.common.granularity.Granularity; @@ -68,6 +69,10 @@ import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireHydrant; import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.SegmentTracker; +import io.druid.segment.realtime.SegmentTrackerMetadata; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.appenderator.SinkQuerySegmentWalker; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; @@ -96,6 +101,8 @@ public class RealtimePlumber implements Plumber { private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class); private static final int WARN_DELAY = 1000; + private static final String IDENTIFIER_FILE_NAME = "identifier.json"; + private static final String SEGMENT_TRACKING_FILE_NAME = "segment-tracking.json"; private final DataSchema schema; private final RealtimeTuningConfig config; @@ -106,11 +113,13 @@ public class RealtimePlumber implements Plumber private final SegmentPublisher segmentPublisher; private final SegmentHandoffNotifier handoffNotifier; private final Object handoffCondition = new Object(); - private final Map sinks = Maps.newConcurrentMap(); + private final Map sinks = Maps.newConcurrentMap(); private final VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline( String.CASE_INSENSITIVE_ORDER ); private final QuerySegmentWalker texasRanger; + private final SegmentTracker segmentTracker; + private final ObjectMapper objectMapper; private final Cache cache; @@ -142,7 +151,8 @@ public RealtimePlumber( IndexIO indexIO, Cache cache, CacheConfig cacheConfig, - ObjectMapper objectMapper + ObjectMapper objectMapper, + SegmentAllocator segmentAllocator ) { this.schema = schema; @@ -166,6 +176,8 @@ public RealtimePlumber( cache, cacheConfig ); + this.segmentTracker = new SegmentTracker(segmentAllocator); + this.objectMapper = objectMapper; log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy()); } @@ -185,7 +197,7 @@ public RejectionPolicy getRejectionPolicy() return rejectionPolicy; } - public Map getSinks() + protected Map getSinks() { return sinks; } @@ -205,10 +217,23 @@ public Object startJob() } @Override - public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException + public int add(InputRow row, String sequenceName, Supplier committerSupplier) throws IndexSizeExceededException { long messageTimestamp = row.getTimestampFromEpoch(); - final Sink sink = getSink(messageTimestamp); + if (!rejectionPolicy.accept(messageTimestamp)) { + return -1; + } + + final SegmentIdentifier identifier; + + try { + identifier = segmentTracker.getSegment(row, sequenceName); + } + catch (IOException ex) { + throw new RE(ex, "Could not allocate segment for {}", row.getTimestamp()); + } + + final Sink sink = getSink(identifier); metrics.reportMessageMaxTimestamp(messageTimestamp); if (sink == null) { return -1; @@ -223,35 +248,21 @@ public int add(InputRow row, Supplier committerSupplier) throws Index return numRows; } - private Sink getSink(long timestamp) + private Sink getSink(SegmentIdentifier identifier) { - if (!rejectionPolicy.accept(timestamp)) { - return null; - } - - final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity(); - final VersioningPolicy versioningPolicy = config.getVersioningPolicy(); - - DateTime truncatedDateTime = segmentGranularity.bucketStart(DateTimes.utc(timestamp)); - final long truncatedTime = truncatedDateTime.getMillis(); - - Sink retVal = sinks.get(truncatedTime); + Sink retVal = sinks.get(identifier); if (retVal == null) { - final Interval sinkInterval = new Interval( - truncatedDateTime, - segmentGranularity.increment(truncatedDateTime) - ); - retVal = new Sink( - sinkInterval, + identifier.getInterval(), schema, - config.getShardSpec(), - versioningPolicy.getVersion(sinkInterval), + identifier.getShardSpec(), + identifier.getVersion(), config.getMaxRowsInMemory(), config.isReportParseExceptions() ); - addSink(retVal); + + addSink(identifier, retVal); } @@ -268,10 +279,11 @@ public QueryRunner getQueryRunner(final Query query) @Override public void persist(final Committer committer) { - final List> indexesToPersist = Lists.newArrayList(); - for (Sink sink : sinks.values()) { + final List> indexesToPersist = Lists.newArrayList(); + for (Map.Entry entry : sinks.entrySet()) { + final Sink sink = entry.getValue(); if (sink.swappable()) { - indexesToPersist.add(Pair.of(sink.swap(), sink.getInterval())); + indexesToPersist.add(Pair.of(sink.swap(), entry.getKey())); } } @@ -321,7 +333,7 @@ handed off instead of individual segments being handed off (that is, if one of t */ long persistThreadCpuTime = VMUtils.safeGetThreadCpuTime(); try { - for (Pair pair : indexesToPersist) { + for (Pair pair : indexesToPersist) { metrics.incrementRowOutputCount( persistHydrant( pair.lhs, schema, pair.rhs, metadataElems @@ -354,10 +366,10 @@ handed off instead of individual segments being handed off (that is, if one of t } // Submits persist-n-merge task for a Sink to the mergeExecutor - private void persistAndMerge(final long truncatedTime, final Sink sink) + private void persistAndMerge(final SegmentIdentifier identifier, final Sink sink) { final String threadName = StringUtils.format( - "%s-%s-persist-n-merge", schema.getDataSource(), DateTimes.utc(truncatedTime) + "%s-%s-persist-n-merge", schema.getDataSource(), identifier ); mergeExecutor.execute( new ThreadRenamingRunnable(threadName) @@ -370,7 +382,7 @@ public void doRun() { try { // Bail out if this sink has been abandoned by a previously-executed task. - if (sinks.get(truncatedTime) != sink) { + if (sinks.get(identifier) != sink) { log.info("Sink[%s] was abandoned, bailing out of persist-n-merge.", sink); return; } @@ -400,7 +412,7 @@ public void doRun() synchronized (hydrant) { if (!hydrant.hasSwapped()) { log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); - final int rowCount = persistHydrant(hydrant, schema, interval, null); + final int rowCount = persistHydrant(hydrant, schema, identifier, null); metrics.incrementRowOutputCount(rowCount); } } @@ -454,7 +466,7 @@ public void doRun() // We're trying to shut down, and this segment failed to push. Let's just get rid of it. // This call will also delete possibly-partially-written files, so we don't need to do it explicitly. cleanShutdown = false; - abandonSegment(truncatedTime, sink); + abandonSegment(identifier, sink); } } finally { @@ -472,7 +484,7 @@ mergeExecutor, new Runnable() @Override public void run() { - abandonSegment(sink.getInterval().getStartMillis(), sink); + abandonSegment(identifier, sink); metrics.incrementHandOffCount(); } } @@ -486,7 +498,7 @@ public void finishJob() shuttingDown = true; - for (final Map.Entry entry : sinks.entrySet()) { + for (final Map.Entry entry : sinks.entrySet()) { persistAndMerge(entry.getKey(), entry.getValue()); } @@ -586,8 +598,6 @@ protected void shutdownExecutors() protected Object bootstrapSinksFromDisk() { - final VersioningPolicy versioningPolicy = config.getVersioningPolicy(); - File baseDir = computeBaseDir(schema); if (baseDir == null || !baseDir.exists()) { return null; @@ -601,6 +611,50 @@ protected Object bootstrapSinksFromDisk() Object metadata = null; long latestCommitTime = 0; for (File sinkDir : files) { + final File identifierFile = new File(sinkDir, IDENTIFIER_FILE_NAME); + if (!identifierFile.isFile()) { + // No identifier in this sinkDir; it must not actually be a sink directory. Skip it. + continue; + } + + final SegmentIdentifier identifier; + + try { + identifier = objectMapper.readValue( + identifierFile, + SegmentIdentifier.class + ); + } + catch (IOException e) { + log.makeAlert(e, "Problem loading sink[%s] from disk.", schema.getDataSource()) + .addData("sinkDir", sinkDir) + .emit(); + continue; + } + + final File segmentTrackingFile = new File(sinkDir, SEGMENT_TRACKING_FILE_NAME); + if (!segmentTrackingFile.isFile()) { + // No segmenttracking in this sinkDir + continue; + } + + final SegmentTrackerMetadata segmentTrackerMetadata; + + try { + segmentTrackerMetadata = objectMapper.readValue( + segmentTrackingFile, + SegmentTrackerMetadata.class + ); + + segmentTracker.restoreFromMetadata(segmentTrackerMetadata); + } + catch (IOException e) { + log.makeAlert(e, "Problem loading sink[%s] from disk.", schema.getDataSource()) + .addData("sinkDir", sinkDir) + .emit(); + continue; + } + final Interval sinkInterval = Intervals.of(sinkDir.getName().replace("_", "/")); //final File[] sinkFiles = sinkDir.listFiles(); @@ -683,13 +737,7 @@ public int compare(File o1, File o2) hydrants.add( new FireHydrant( new QueryableIndexSegment( - DataSegment.makeDataSegmentIdentifier( - schema.getDataSource(), - sinkInterval.getStart(), - sinkInterval.getEnd(), - versioningPolicy.getVersion(sinkInterval), - config.getShardSpec() - ), + identifier.getIdentifierAsString(), queryableIndex ), Integer.parseInt(segmentDir.getName()) @@ -707,20 +755,20 @@ public int compare(File o1, File o2) final Sink currSink = new Sink( sinkInterval, schema, - config.getShardSpec(), - versioningPolicy.getVersion(sinkInterval), + identifier.getShardSpec(), + identifier.getVersion(), config.getMaxRowsInMemory(), config.isReportParseExceptions(), hydrants ); - addSink(currSink); + addSink(identifier, currSink); } return metadata; } - private void addSink(final Sink sink) + private void addSink(SegmentIdentifier identifier, final Sink sink) { - sinks.put(sink.getInterval().getStartMillis(), sink); + sinks.put(identifier, sink); metrics.setSinkCount(sinks.size()); sinkTimeline.add( sink.getInterval(), @@ -810,17 +858,17 @@ private void mergeAndPush() minTimestampAsDate ); - List> sinksToPush = Lists.newArrayList(); - for (Map.Entry entry : sinks.entrySet()) { - final Long intervalStart = entry.getKey(); - if (intervalStart < minTimestamp) { + List> sinksToPush = Lists.newArrayList(); + for (Map.Entry entry : sinks.entrySet()) { + final DateTime intervalStart = entry.getKey().getInterval().getStart(); + if (intervalStart.isBefore(minTimestamp)) { log.info("Adding entry [%s] for merge and push.", entry); sinksToPush.add(entry); } else { log.info( "Skipping persist and merge for entry [%s] : Start time [%s] >= [%s] min timestamp required in this run. Segment will be picked up in a future run.", entry, - DateTimes.utc(intervalStart), + intervalStart, minTimestampAsDate ); } @@ -828,7 +876,7 @@ private void mergeAndPush() log.info("Found [%,d] sinks to persist and merge", sinksToPush.size()); - for (final Map.Entry entry : sinksToPush) { + for (final Map.Entry entry : sinksToPush) { persistAndMerge(entry.getKey(), entry.getValue()); } } @@ -838,17 +886,17 @@ private void mergeAndPush() * from the single-threaded mergeExecutor, since otherwise chaos may ensue if merged segments are deleted while * being created. * - * @param truncatedTime sink key + * @param identifier sink key * @param sink sink to unannounce */ - protected void abandonSegment(final long truncatedTime, final Sink sink) + protected void abandonSegment(final SegmentIdentifier identifier, final Sink sink) { - if (sinks.containsKey(truncatedTime)) { + if (sinks.containsKey(identifier)) { try { segmentAnnouncer.unannounceSegment(sink.getSegment()); removeSegment(sink, computePersistDir(schema, sink.getInterval())); - log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier()); - sinks.remove(truncatedTime); + log.info("Removing sinkKey %s for segment %s", identifier, sink.getSegment().getIdentifier()); + sinks.remove(identifier); metrics.setSinkCount(sinks.size()); sinkTimeline.remove( sink.getInterval(), @@ -894,14 +942,14 @@ protected File computePersistDir(DataSchema schema, Interval interval) * * @param indexToPersist hydrant to persist * @param schema datasource schema - * @param interval interval to persist + * @param identifier segment identifier to persist * * @return the number of rows persisted */ protected int persistHydrant( FireHydrant indexToPersist, DataSchema schema, - Interval interval, + SegmentIdentifier identifier, Map metadataElems ) { @@ -909,7 +957,7 @@ protected int persistHydrant( if (indexToPersist.hasSwapped()) { log.info( "DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.", - schema.getDataSource(), interval, indexToPersist + schema.getDataSource(), identifier.getInterval(), indexToPersist ); return 0; } @@ -917,7 +965,7 @@ protected int persistHydrant( log.info( "DataSource[%s], Interval[%s], Metadata [%s] persisting Hydrant[%s]", schema.getDataSource(), - interval, + identifier.getInterval(), metadataElems, indexToPersist ); @@ -927,13 +975,22 @@ protected int persistHydrant( final IndexSpec indexSpec = config.getIndexSpec(); indexToPersist.getIndex().getMetadata().putAll(metadataElems); + + final File persistDir = computePersistDir(schema, identifier.getInterval()); final File persistedFile = indexMerger.persist( indexToPersist.getIndex(), - interval, - new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())), + identifier.getInterval(), + new File(persistDir, String.valueOf(indexToPersist.getCount())), indexSpec ); + final File identifierFile = new File(persistDir, IDENTIFIER_FILE_NAME); + objectMapper.writeValue(identifierFile, identifier); + + final File segmentTrackingMetadataFile = new File(persistDir, SEGMENT_TRACKING_FILE_NAME); + final SegmentTrackerMetadata segmentTrackerMetadata = segmentTracker.wrapMetadata(null); + objectMapper.writeValue(segmentTrackingMetadataFile, segmentTrackerMetadata); + indexToPersist.swapSegment( new QueryableIndexSegment( indexToPersist.getSegment().getIdentifier(), @@ -944,7 +1001,7 @@ protected int persistHydrant( } catch (IOException e) { log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource()) - .addData("interval", interval) + .addData("interval", identifier.getInterval()) .addData("count", indexToPersist.getCount()) .emit(); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index 8aaf0f2bf7a2..c16f2a86a247 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -35,6 +35,7 @@ import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.server.coordination.DataSegmentAnnouncer; import java.util.concurrent.ExecutorService; @@ -89,6 +90,7 @@ public RealtimePlumberSchool( @Override public Plumber findPlumber( + final SegmentAllocator segmentAllocator, final DataSchema schema, final RealtimeTuningConfig config, final FireDepartmentMetrics metrics @@ -111,7 +113,8 @@ public Plumber findPlumber( indexIO, cache, cacheConfig, - objectMapper + objectMapper, + segmentAllocator ); } diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 90b1e66f4b24..aa4d2abd2113 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -70,6 +70,7 @@ import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.segment.realtime.plumber.Sink; @@ -167,7 +168,7 @@ public Firehose connect(InputRowParser parser, File temporaryDirectory) throws I { @Override public Plumber findPlumber( - DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + SegmentAllocator segmentAllocator, DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics ) { return plumber; @@ -181,7 +182,7 @@ public Plumber findPlumber( { @Override public Plumber findPlumber( - DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + SegmentAllocator segmentAllocator, DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics ) { return plumber2; @@ -373,12 +374,12 @@ public Firehose connect(InputRowParser parser, File temporaryDirectory) throws I return firehose; } }, - (schema, config, metrics) -> plumber, + (segmentAllocator, schema, config, metrics) -> plumber, null ); RealtimeIOConfig ioConfig2 = new RealtimeIOConfig( null, - (schema, config, metrics) -> plumber2, + (segmentAllocator, schema, config, metrics) -> plumber2, (parser, arg) -> firehoseV2 ); @@ -417,7 +418,7 @@ public Firehose connect(InputRowParser parser, File temporaryDirectory) throws I return firehose; } }, - (schema, config, metrics) -> plumber, + (segmentAllocator, schema, config, metrics) -> plumber, null ); @@ -1043,7 +1044,7 @@ public Object startJob() } @Override - public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException + public int add(InputRow row, String sequenceName, Supplier committerSupplier) throws IndexSizeExceededException { if (row == null) { return -1; diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverFailTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverFailTest.java index e1237ef16966..73a8fa35b6b2 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverFailTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverFailTest.java @@ -41,7 +41,6 @@ import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.appenderator.AppenderatorDriverTest.TestCommitterSupplier; -import io.druid.segment.realtime.appenderator.AppenderatorDriverTest.TestSegmentAllocator; import io.druid.segment.realtime.appenderator.AppenderatorDriverTest.TestSegmentHandoffNotifierFactory; import io.druid.timeline.DataSegment; import org.hamcrest.CoreMatchers; @@ -99,7 +98,7 @@ public class AppenderatorDriverFailTest @Before public void setUp() { - allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR); + allocator = new TestSegmentAllocator(DATA_SOURCE, "abc123", Granularities.HOUR); segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory(); } diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java index 774697fb47ec..11b29c81f3ed 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java @@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; @@ -38,7 +37,6 @@ import io.druid.java.util.common.Intervals; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.granularity.Granularities; -import io.druid.java.util.common.granularity.Granularity; import io.druid.query.SegmentDescriptor; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; @@ -48,7 +46,6 @@ import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.NumberedShardSpec; import io.druid.timeline.partition.PartitionChunk; -import org.joda.time.DateTime; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -57,13 +54,11 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; public class AppenderatorDriverTest @@ -103,7 +98,7 @@ public class AppenderatorDriverTest public void setUp() { appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY); - allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR); + allocator = new TestSegmentAllocator(DATA_SOURCE, VERSION, Granularities.HOUR); segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory(); driver = new AppenderatorDriver( appenderatorTester.getAppenderator(), @@ -398,42 +393,6 @@ public void run() } } - static class TestSegmentAllocator implements SegmentAllocator - { - private final String dataSource; - private final Granularity granularity; - private final Map counters = Maps.newHashMap(); - - public TestSegmentAllocator(String dataSource, Granularity granularity) - { - this.dataSource = dataSource; - this.granularity = granularity; - } - - @Override - public SegmentIdentifier allocate( - final InputRow row, - final String sequenceName, - final String previousSegmentId - ) throws IOException - { - synchronized (counters) { - DateTime dateTimeTruncated = granularity.bucketStart(row.getTimestamp()); - final long timestampTruncated = dateTimeTruncated.getMillis(); - if (!counters.containsKey(timestampTruncated)) { - counters.put(timestampTruncated, new AtomicInteger()); - } - final int partitionNum = counters.get(timestampTruncated).getAndIncrement(); - return new SegmentIdentifier( - dataSource, - granularity.bucket(dateTimeTruncated), - VERSION, - new NumberedShardSpec(partitionNum, 0) - ); - } - } - } - static class TestSegmentHandoffNotifierFactory implements SegmentHandoffNotifierFactory { private boolean handoffEnabled = true; diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index 3a6dc6dcfb0a..ea4360442dc7 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -112,17 +112,17 @@ public void testSimpleIngestion() throws Exception commitMetadata.put("x", "1"); Assert.assertEquals( 1, - plumber.add(rows[0], null)); + plumber.add(rows[0], null, null)); commitMetadata.put("x", "2"); Assert.assertEquals( 2, - plumber.add(rows[1], null)); + plumber.add(rows[1], null, null)); commitMetadata.put("x", "3"); Assert.assertEquals( 3, - plumber.add(rows[2], null)); + plumber.add(rows[2], null, null)); Assert.assertEquals(1, plumber.getSegmentsView().size()); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/SpecBasedSegmentAllocatorTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/SpecBasedSegmentAllocatorTest.java new file mode 100644 index 000000000000..3f7d797d2fbf --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/SpecBasedSegmentAllocatorTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.DateTimes; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.segment.realtime.plumber.CustomVersioningPolicy; +import io.druid.timeline.partition.LinearShardSpec; +import io.druid.timeline.partition.ShardSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; + +public class SpecBasedSegmentAllocatorTest +{ + + @Test + public void basicTest() throws Exception + { + DataSchema schema = new DataSchema( + "dataSource", + Collections.emptyMap(), + new AggregatorFactory[] {}, + new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), + new DefaultObjectMapper() + ); + + ShardSpec shardSpec = new LinearShardSpec(1); + RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( + 0, + null, + null, + null, + new CustomVersioningPolicy("testing"), + null, + null, + shardSpec, + null, + null, + 0, + 0, + null, + null, + null + ); + + SegmentAllocator allocator = new SpecBasedSegmentAllocator(schema, tuningConfig); + + InputRow row = new MapBasedInputRow( + DateTimes.of("2017-09-10T12:35:00.000Z"), + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo", "met1", "1") + ); + + SegmentIdentifier identifier = allocator.allocate(row, null, null); + + Assert.assertEquals("dataSource", identifier.getDataSource()); + Assert.assertEquals("testing", identifier.getVersion()); + Assert.assertEquals(shardSpec, identifier.getShardSpec()); + Assert.assertEquals( + Intervals.of("2017-09-10T00:00:00.000Z/2017-09-11T00:00:00.000Z"), + identifier.getInterval() + ); + } + +} diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/TestSegmentAllocator.java b/server/src/test/java/io/druid/segment/realtime/appenderator/TestSegmentAllocator.java new file mode 100644 index 000000000000..517f7fcfbf02 --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/TestSegmentAllocator.java @@ -0,0 +1,68 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.google.common.collect.Maps; +import io.druid.data.input.InputRow; +import io.druid.java.util.common.granularity.Granularity; +import io.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.DateTime; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class TestSegmentAllocator implements SegmentAllocator +{ + private final String dataSource; + private final String version; + private final Granularity granularity; + private final Map counters = Maps.newHashMap(); + + public TestSegmentAllocator(String dataSource, String version, Granularity granularity) + { + this.dataSource = dataSource; + this.version = version; + this.granularity = granularity; + } + + @Override + public SegmentIdentifier allocate( + final InputRow row, + final String sequenceName, + final String previousSegmentId + ) throws IOException + { + synchronized (counters) { + DateTime dateTimeTruncated = granularity.bucketStart(row.getTimestamp()); + final long timestampTruncated = dateTimeTruncated.getMillis(); + if (!counters.containsKey(timestampTruncated)) { + counters.put(timestampTruncated, new AtomicInteger()); + } + final int partitionNum = counters.get(timestampTruncated).getAndIncrement(); + return new SegmentIdentifier( + dataSource, + granularity.bucket(dateTimeTruncated), + version, + new NumberedShardSpec(partitionNum, 0) + ); + } + } +} diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 414dd8977389..9855f7f34223 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -55,8 +55,11 @@ import io.druid.segment.realtime.FireDepartmentTest; import io.druid.segment.realtime.FireHydrant; import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; import org.apache.commons.io.FileUtils; import org.easymock.EasyMock; import org.joda.time.DateTime; @@ -97,6 +100,9 @@ public class RealtimePlumberSchoolTest private DataSchema schema2; private FireDepartmentMetrics metrics; private File tmpDir; + private SegmentAllocator segmentAllocator; + private SegmentIdentifier segmentIdentifier; + private String sequenceName; public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy) { @@ -182,6 +188,8 @@ public void setUp() throws Exception ) ).andReturn(true).anyTimes(); + segmentAllocator = EasyMock.createMock(SegmentAllocator.class); + emitter = EasyMock.createMock(ServiceEmitter.class); EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, handoffNotifierFactory, handoffNotifier, emitter); @@ -220,7 +228,9 @@ public void setUp() throws Exception ); metrics = new FireDepartmentMetrics(); - plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema, tuningConfig, metrics); + plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(segmentAllocator, schema, tuningConfig, metrics); + + sequenceName = String.valueOf(System.nanoTime()); } @After @@ -248,17 +258,25 @@ public void testPersistWithCommitMetadata() throws Exception final Object commitMetadata = "dummyCommitMetadata"; testPersist(commitMetadata); - plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema, tuningConfig, metrics); + plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(segmentAllocator, schema, tuningConfig, metrics); Assert.assertEquals(commitMetadata, plumber.startJob()); } private void testPersist(final Object commitMetadata) throws Exception { + Interval segmentInterval = Intervals.utc(0, TimeUnit.HOURS.toMillis(1)); + segmentIdentifier = new SegmentIdentifier( + schema.getDataSource(), + segmentInterval, + "version", + tuningConfig.getShardSpec() + ); + plumber.getSinks() .put( - 0L, + segmentIdentifier, new Sink( - Intervals.utc(0, TimeUnit.HOURS.toMillis(1)), + segmentInterval, schema, tuningConfig.getShardSpec(), DateTimes.of("2014-12-01T12:34:56.789").toString(), @@ -273,6 +291,11 @@ private void testPersist(final Object commitMetadata) throws Exception EasyMock.expect(row.getDimensions()).andReturn(new ArrayList()); EasyMock.replay(row); + EasyMock.expect( + segmentAllocator.allocate(EasyMock.anyObject(InputRow.class), EasyMock.anyString(), EasyMock.anyString()) + ).andReturn(segmentIdentifier); + EasyMock.replay(segmentAllocator); + final CountDownLatch doneSignal = new CountDownLatch(1); final Committer committer = new Committer() @@ -289,7 +312,7 @@ public void run() doneSignal.countDown(); } }; - plumber.add(row, Suppliers.ofInstance(committer)); + plumber.add(row, sequenceName, Suppliers.ofInstance(committer)); plumber.persist(committer); doneSignal.await(); @@ -301,11 +324,23 @@ public void run() @Test(timeout = 60000) public void testPersistFails() throws Exception { + Interval segmentInterval = Intervals.utc(0, TimeUnit.HOURS.toMillis(1)); + segmentIdentifier = new SegmentIdentifier( + schema.getDataSource(), + segmentInterval, + "version", + tuningConfig.getShardSpec() + ); + EasyMock.expect( + segmentAllocator.allocate(EasyMock.anyObject(InputRow.class), EasyMock.anyString(), EasyMock.anyString()) + ).andReturn(segmentIdentifier); + EasyMock.replay(segmentAllocator); + plumber.getSinks() .put( - 0L, + segmentIdentifier, new Sink( - Intervals.utc(0, TimeUnit.HOURS.toMillis(1)), + segmentInterval, schema, tuningConfig.getShardSpec(), DateTimes.of("2014-12-01T12:34:56.789").toString(), @@ -318,7 +353,7 @@ public void testPersistFails() throws Exception EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L); EasyMock.expect(row.getDimensions()).andReturn(new ArrayList()); EasyMock.replay(row); - plumber.add(row, Suppliers.ofInstance(Committers.nil())); + plumber.add(row, sequenceName, Suppliers.ofInstance(Committers.nil())); final CountDownLatch doneSignal = new CountDownLatch(1); @@ -357,10 +392,19 @@ private void testPersistHydrantGapsHelper(final Object commitMetadata) throws Ex { Interval testInterval = new Interval(DateTimes.of("1970-01-01"), DateTimes.of("1971-01-01")); - RealtimePlumber plumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics); + segmentIdentifier = new SegmentIdentifier(schema2.getDataSource(), testInterval, "test", NoneShardSpec.instance()); + + EasyMock.expect( + segmentAllocator.allocate(EasyMock.anyObject(InputRow.class), EasyMock.anyString(), EasyMock.anyString()) + ).andReturn( + segmentIdentifier + ); + EasyMock.replay(segmentAllocator); + + RealtimePlumber plumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(segmentAllocator, schema2, tuningConfig, metrics); plumber2.getSinks() .put( - 0L, + segmentIdentifier, new Sink( testInterval, schema2, @@ -386,11 +430,11 @@ public void run() doneSignal.countDown(); } }; - plumber2.add(getTestInputRow("1970-01-01"), Suppliers.ofInstance(committer)); - plumber2.add(getTestInputRow("1970-02-01"), Suppliers.ofInstance(committer)); - plumber2.add(getTestInputRow("1970-03-01"), Suppliers.ofInstance(committer)); - plumber2.add(getTestInputRow("1970-04-01"), Suppliers.ofInstance(committer)); - plumber2.add(getTestInputRow("1970-05-01"), Suppliers.ofInstance(committer)); + plumber2.add(getTestInputRow("1970-01-01"), sequenceName, Suppliers.ofInstance(committer)); + plumber2.add(getTestInputRow("1970-02-01"), sequenceName, Suppliers.ofInstance(committer)); + plumber2.add(getTestInputRow("1970-03-01"), sequenceName, Suppliers.ofInstance(committer)); + plumber2.add(getTestInputRow("1970-04-01"), sequenceName, Suppliers.ofInstance(committer)); + plumber2.add(getTestInputRow("1970-05-01"), sequenceName, Suppliers.ofInstance(committer)); plumber2.persist(committer); @@ -409,17 +453,18 @@ public void run() FileUtils.deleteDirectory(new File(persistDir, "1")); FileUtils.deleteDirectory(new File(persistDir, "3")); RealtimePlumber restoredPlumber = (RealtimePlumber) realtimePlumberSchool.findPlumber( + segmentAllocator, schema2, tuningConfig, metrics ); restoredPlumber.bootstrapSinksFromDisk(); - Map sinks = restoredPlumber.getSinks(); + Map sinks = restoredPlumber.getSinks(); Assert.assertEquals(1, sinks.size()); - List hydrants = Lists.newArrayList(sinks.get(new Long(0))); + List hydrants = Lists.newArrayList(sinks.get(segmentIdentifier)); DateTime startTime = DateTimes.of("1970-01-01T00:00:00.000Z"); Interval expectedInterval = new Interval(startTime, DateTimes.of("1971-01-01T00:00:00.000Z")); Assert.assertEquals(0, hydrants.get(0).getCount()); @@ -443,6 +488,7 @@ public void run() FileUtils.deleteDirectory(new File(persistDir, "2")); FileUtils.deleteDirectory(new File(persistDir, "4")); RealtimePlumber restoredPlumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber( + segmentAllocator, schema2, tuningConfig, metrics @@ -472,9 +518,20 @@ private void testDimOrderInheritanceHelper(final Object commitMetadata) throws E QueryableIndex qindex; FireHydrant hydrant; - Map sinks; + Map sinks; + + Interval testInterval = new Interval(DateTimes.of("1970-01-01"), DateTimes.of("1971-01-01")); + + segmentIdentifier = new SegmentIdentifier(schema2.getDataSource(), testInterval, "test", NoneShardSpec.instance()); + + EasyMock.expect( + segmentAllocator.allocate(EasyMock.anyObject(InputRow.class), EasyMock.anyString(), EasyMock.anyString()) + ).andReturn( + segmentIdentifier + ); + EasyMock.replay(segmentAllocator); - RealtimePlumber plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics); + RealtimePlumber plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(segmentAllocator, schema2, tuningConfig, metrics); Assert.assertNull(plumber.startJob()); final CountDownLatch doneSignal = new CountDownLatch(1); @@ -500,6 +557,7 @@ public void run() ImmutableList.of("dimD"), ImmutableList.of("1") ), + sequenceName, Suppliers.ofInstance(committer) ); plumber.add( @@ -508,6 +566,7 @@ public void run() ImmutableList.of("dimC"), ImmutableList.of("1") ), + sequenceName, Suppliers.ofInstance(committer) ); plumber.add( @@ -516,6 +575,7 @@ public void run() ImmutableList.of("dimA"), ImmutableList.of("1") ), + sequenceName, Suppliers.ofInstance(committer) ); plumber.add( @@ -524,6 +584,7 @@ public void run() ImmutableList.of("dimB"), ImmutableList.of("1") ), + sequenceName, Suppliers.ofInstance(committer) ); plumber.add( @@ -532,6 +593,7 @@ public void run() ImmutableList.of("dimE"), ImmutableList.of("1") ), + sequenceName, Suppliers.ofInstance(committer) ); plumber.add( @@ -540,6 +602,7 @@ public void run() ImmutableList.of("dimA", "dimB", "dimC", "dimD", "dimE"), ImmutableList.of("1") ), + sequenceName, Suppliers.ofInstance(committer) ); @@ -551,6 +614,7 @@ public void run() plumber.finishJob(); RealtimePlumber restoredPlumber = (RealtimePlumber) realtimePlumberSchool.findPlumber( + segmentAllocator, schema2, tuningConfig, metrics @@ -559,7 +623,7 @@ public void run() sinks = restoredPlumber.getSinks(); Assert.assertEquals(1, sinks.size()); - List hydrants = Lists.newArrayList(sinks.get(0L)); + List hydrants = Lists.newArrayList(sinks.get(segmentIdentifier)); for (int i = 0; i < hydrants.size(); i++) { hydrant = hydrants.get(i); diff --git a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java index b83a93bdbe9d..0664aec2f3bb 100644 --- a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java @@ -37,6 +37,7 @@ import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; @@ -157,7 +158,7 @@ public void testTaskValidator() throws Exception { @Override public Plumber findPlumber( - DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + SegmentAllocator segmentAllocator, DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics ) { return null; From aea47c29aee272b16a365668237a1e87519ab980 Mon Sep 17 00:00:00 2001 From: Kevin Conaway Date: Sun, 10 Sep 2017 13:48:12 -0400 Subject: [PATCH 3/6] Ensure that checkstyle only ignores the Logger import --- .../io/druid/indexing/common/task/RealtimeIndexTaskTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 4c44cb599d25..82d38aac442f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -33,7 +33,7 @@ import com.google.common.util.concurrent.MoreExecutors; //CHECKSTYLE.OFF: Regexp import com.metamx.common.logger.Logger; - +//CHECKSTYLE.ON: Regexp import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.core.LoggingEmitter; import com.metamx.emitter.service.ServiceEmitter; From 14f2c34fb92196cf2f3a0450b2ff5c524ee1a903 Mon Sep 17 00:00:00 2001 From: Kevin Conaway Date: Tue, 12 Sep 2017 10:09:38 -0400 Subject: [PATCH 4/6] RealtimeIndexTaskTest should use the actual #allocatePendingSegment API in IndexerSQLMetadataStorageCoordinato --- .../common/task/RealtimeIndexTaskTest.java | 124 +++++++++++------- 1 file changed, 74 insertions(+), 50 deletions(-) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 82d38aac442f..d1c8a4bbbb88 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -27,7 +27,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -59,15 +58,15 @@ import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskStorageConfig; +import io.druid.indexing.overlord.DataSourceMetadata; import io.druid.indexing.overlord.HeapMemoryTaskStorage; -import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.indexing.overlord.TaskLockbox; import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.supervisor.SupervisorManager; import io.druid.indexing.test.TestDataSegmentAnnouncer; import io.druid.indexing.test.TestDataSegmentKiller; import io.druid.indexing.test.TestDataSegmentPusher; -import io.druid.indexing.test.TestIndexerMetadataStorageCoordinator; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; @@ -77,6 +76,8 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.parsers.ParseException; import io.druid.metadata.EntryExistsException; +import io.druid.metadata.IndexerSQLMetadataStorageCoordinator; +import io.druid.metadata.TestDerbyConnector; import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.Druids; import io.druid.query.IntervalChunkingQueryRunnerDecorator; @@ -106,8 +107,6 @@ import io.druid.segment.loading.SegmentLoaderLocalCacheManager; import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.realtime.FireDepartment; -import io.druid.segment.realtime.appenderator.SegmentAllocator; -import io.druid.segment.realtime.appenderator.TestSegmentAllocator; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.segment.realtime.plumber.ServerTimeRejectionPolicyFactory; @@ -115,10 +114,10 @@ import io.druid.server.coordination.DataSegmentServerAnnouncer; import io.druid.server.coordination.ServerType; import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.LinearShardSpec; import org.easymock.EasyMock; import org.hamcrest.CoreMatchers; import org.joda.time.DateTime; -import org.joda.time.DateTimeZone; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; @@ -135,9 +134,12 @@ import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -241,9 +243,13 @@ public Firehose connect(InputRowParser parser, File temporaryDirectory) throws I @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); + @Rule + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + private DateTime now; private ListeningExecutorService taskExec; private Map> handOffCallbacks; + private Collection publishedSegments = new CopyOnWriteArrayList<>(); @Before public void setUp() @@ -252,6 +258,12 @@ public void setUp() emitter.start(); taskExec = MoreExecutors.listeningDecorator(Execs.singleThreaded("realtime-index-task-test-%d")); now = DateTimes.nowUtc(); + + TestDerbyConnector derbyConnector = derbyConnectorRule.getConnector(); + derbyConnector.createDataSourceTable(); + derbyConnector.createTaskTables(); + derbyConnector.createSegmentTable(); + derbyConnector.createPendingSegmentsTable(); } @After @@ -280,9 +292,8 @@ public void testDefaultResource() throws Exception @Test(timeout = 60_000L, expected = ExecutionException.class) public void testHandoffTimeout() throws Exception { - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); final RealtimeIndexTask task = makeRealtimeTask(null, true, 100L); - final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); + final TaskToolbox taskToolbox = makeToolbox(task, tempFolder.newFolder()); final ListenableFuture statusFuture = runTask(task, taskToolbox); // Wait for firehose to show up, it starts off null. @@ -306,12 +317,12 @@ public void testHandoffTimeout() throws Exception firehose.close(); // Wait for publish. - while (mdc.getPublished().isEmpty()) { + while (publishedSegments.isEmpty()) { Thread.sleep(50); } Assert.assertEquals(1, task.getMetrics().processed()); - Assert.assertNotNull(Iterables.getOnlyElement(mdc.getPublished())); + Assert.assertNotNull(Iterables.getOnlyElement(publishedSegments)); // handoff would timeout, resulting in exception @@ -321,9 +332,9 @@ public void testHandoffTimeout() throws Exception @Test(timeout = 60_000L) public void testBasics() throws Exception { - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); + final RealtimeIndexTask task = makeRealtimeTask(null); - final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); + final TaskToolbox taskToolbox = makeToolbox(task, tempFolder.newFolder()); final ListenableFuture statusFuture = runTask(task, taskToolbox); final DataSegment publishedSegment; @@ -358,11 +369,11 @@ public void testBasics() throws Exception firehose.close(); // Wait for publish. - while (mdc.getPublished().isEmpty()) { + while (publishedSegments.isEmpty()) { Thread.sleep(50); } - publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); + publishedSegment = Iterables.getOnlyElement(publishedSegments); // Check metrics. Assert.assertEquals(2, task.getMetrics().processed()); @@ -396,9 +407,8 @@ public void testBasics() throws Exception @Test(timeout = 60_000L) public void testReportParseExceptionsOnBadMetric() throws Exception { - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); final RealtimeIndexTask task = makeRealtimeTask(null, true); - final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); + final TaskToolbox taskToolbox = makeToolbox(task, tempFolder.newFolder()); final ListenableFuture statusFuture = runTask(task, taskToolbox); // Wait for firehose to show up, it starts off null. @@ -462,9 +472,8 @@ public void testReportParseExceptionsOnBadMetric() throws Exception @Test(timeout = 60_000L) public void testNoReportParseExceptions() throws Exception { - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); final RealtimeIndexTask task = makeRealtimeTask(null, false); - final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); + final TaskToolbox taskToolbox = makeToolbox(task, tempFolder.newFolder()); final ListenableFuture statusFuture = runTask(task, taskToolbox); final DataSegment publishedSegment; @@ -516,11 +525,11 @@ public void testNoReportParseExceptions() throws Exception firehose.close(); // Wait for publish. - while (mdc.getPublished().isEmpty()) { + while (publishedSegments.isEmpty()) { Thread.sleep(50); } - publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); + publishedSegment = Iterables.getOnlyElement(publishedSegments); // Check metrics. Assert.assertEquals(3, task.getMetrics().processed()); @@ -560,8 +569,7 @@ public void testRestore() throws Exception // First run: { - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); - final TaskToolbox taskToolbox = makeToolbox(task1, mdc, directory); + final TaskToolbox taskToolbox = makeToolbox(task1, directory); final ListenableFuture statusFuture = runTask(task1, taskToolbox); // Wait for firehose to show up, it starts off null. @@ -589,14 +597,13 @@ public void testRestore() throws Exception Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode()); // Nothing should be published. - Assert.assertEquals(Sets.newHashSet(), mdc.getPublished()); + Assert.assertTrue(publishedSegments.isEmpty()); } // Second run: { - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); final RealtimeIndexTask task2 = makeRealtimeTask(task1.getId()); - final TaskToolbox taskToolbox = makeToolbox(task2, mdc, directory); + final TaskToolbox taskToolbox = makeToolbox(task2, directory); final ListenableFuture statusFuture = runTask(task2, taskToolbox); // Wait for firehose to show up, it starts off null. @@ -623,11 +630,11 @@ public void testRestore() throws Exception firehose.close(); // Wait for publish. - while (mdc.getPublished().isEmpty()) { + while (publishedSegments.isEmpty()) { Thread.sleep(50); } - publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); + publishedSegment = Iterables.getOnlyElement(publishedSegments); // Do a query. Assert.assertEquals(2, sumMetric(task2, "rows")); @@ -657,14 +664,13 @@ public void testRestore() throws Exception public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception { final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); final File directory = tempFolder.newFolder(); final RealtimeIndexTask task1 = makeRealtimeTask(null); final DataSegment publishedSegment; // First run: { - final TaskToolbox taskToolbox = makeToolbox(task1, taskStorage, mdc, directory); + final TaskToolbox taskToolbox = makeToolbox(task1, taskStorage, directory); final ListenableFuture statusFuture = runTask(task1, taskToolbox); // Wait for firehose to show up, it starts off null. @@ -688,11 +694,11 @@ public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception firehose.close(); // Wait for publish. - while (mdc.getPublished().isEmpty()) { + while (publishedSegments.isEmpty()) { Thread.sleep(50); } - publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); + publishedSegment = Iterables.getOnlyElement(publishedSegments); // Do a query. Assert.assertEquals(1, sumMetric(task1, "rows")); @@ -709,7 +715,7 @@ public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception // Second run: { final RealtimeIndexTask task2 = makeRealtimeTask(task1.getId()); - final TaskToolbox taskToolbox = makeToolbox(task2, taskStorage, mdc, directory); + final TaskToolbox taskToolbox = makeToolbox(task2, taskStorage, directory); final ListenableFuture statusFuture = runTask(task2, taskToolbox); // Wait for firehose to show up, it starts off null. @@ -724,7 +730,7 @@ public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception firehose.close(); // publishedSegment is still published. No reason it shouldn't be. - Assert.assertEquals(ImmutableSet.of(publishedSegment), mdc.getPublished()); + Assert.assertEquals(ImmutableSet.of(publishedSegment), ImmutableSet.copyOf(publishedSegments)); // Wait for a handoffCallback to show up. while (handOffCallbacks.isEmpty()) { @@ -760,8 +766,7 @@ public void testRestoreCorruptData() throws Exception // First run: { - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); - final TaskToolbox taskToolbox = makeToolbox(task1, mdc, directory); + final TaskToolbox taskToolbox = makeToolbox(task1, directory); final ListenableFuture statusFuture = runTask(task1, taskToolbox); // Wait for firehose to show up, it starts off null. @@ -789,7 +794,7 @@ public void testRestoreCorruptData() throws Exception Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode()); // Nothing should be published. - Assert.assertEquals(Sets.newHashSet(), mdc.getPublished()); + Assert.assertTrue(publishedSegments.isEmpty()); } // Corrupt the data: @@ -808,9 +813,8 @@ public void testRestoreCorruptData() throws Exception // Second run: { - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); final RealtimeIndexTask task2 = makeRealtimeTask(task1.getId()); - final TaskToolbox taskToolbox = makeToolbox(task2, mdc, directory); + final TaskToolbox taskToolbox = makeToolbox(task2, directory); final ListenableFuture statusFuture = runTask(task2, taskToolbox); // Wait for the task to finish. @@ -832,8 +836,7 @@ public void testStopBeforeStarting() throws Exception final RealtimeIndexTask task1 = makeRealtimeTask(null); task1.stopGracefully(); - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); - final TaskToolbox taskToolbox = makeToolbox(task1, mdc, directory); + final TaskToolbox taskToolbox = makeToolbox(task1, directory); final ListenableFuture statusFuture = runTask(task1, taskToolbox); // Wait for the task to finish. @@ -920,26 +923,17 @@ protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) { return true; } - - @Override - protected SegmentAllocator createSegmentAllocator(TaskToolbox toolbox) - { - String version = DateTime.now(DateTimeZone.UTC).toString(); - return new TestSegmentAllocator(dataSchema.getDataSource(), version, granularitySpec.getSegmentGranularity()); - } }; } private TaskToolbox makeToolbox( final Task task, - final IndexerMetadataStorageCoordinator mdc, final File directory ) { return makeToolbox( task, new HeapMemoryTaskStorage(new TaskStorageConfig(null)), - mdc, directory ); } @@ -947,10 +941,40 @@ private TaskToolbox makeToolbox( private TaskToolbox makeToolbox( final Task task, final TaskStorage taskStorage, - final IndexerMetadataStorageCoordinator mdc, final File directory ) { + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.registerSubtypes(LinearShardSpec.class); + IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator( + mapper, + derbyConnectorRule.metadataTablesConfigSupplier().get(), + derbyConnectorRule.getConnector() + ) + { + @Override + public Set announceHistoricalSegments(Set segments) throws IOException + { + Set result = super.announceHistoricalSegments(segments); + + publishedSegments.addAll(result); + + return result; + } + + @Override + public SegmentPublishResult announceHistoricalSegments( + Set segments, DataSourceMetadata startMetadata, DataSourceMetadata endMetadata + ) throws IOException + { + SegmentPublishResult result = super.announceHistoricalSegments(segments, startMetadata, endMetadata); + + publishedSegments.addAll(result.getSegments()); + + return result; + } + }; + final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null); final TaskLockbox taskLockbox = new TaskLockbox(taskStorage); try { From 3a6318eceea805b58916adf5f788d7cff096837e Mon Sep 17 00:00:00 2001 From: Roman Leventov Date: Thu, 21 Sep 2017 02:55:09 -0500 Subject: [PATCH 5/6] Remove SegmentLoaderConfig.numLoadingThreads config (#4831) --- docs/content/configuration/historical.md | 3 +-- .../druid/segment/loading/SegmentLoaderConfig.java | 12 +++--------- .../io/druid/server/coordination/ZkCoordinator.java | 2 +- .../druid/server/coordination/ZkCoordinatorTest.java | 12 ------------ 4 files changed, 5 insertions(+), 24 deletions(-) diff --git a/docs/content/configuration/historical.md b/docs/content/configuration/historical.md index 28319fd230f3..c972cbe65d1f 100644 --- a/docs/content/configuration/historical.md +++ b/docs/content/configuration/historical.md @@ -36,8 +36,7 @@ The historical node uses several of the global configs in [Configuration](../con |`druid.segmentCache.dropSegmentDelayMillis`|How long a node delays before completely dropping segment.|30000 (30 seconds)| |`druid.segmentCache.infoDir`|Historical nodes keep track of the segments they are serving so that when the process is restarted they can reload the same segments without waiting for the Coordinator to reassign. This path defines where this metadata is kept. Directory will be created if needed.|${first_location}/info_dir| |`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)| -|`druid.segmentCache.numLoadingThreads`|How many segments to load concurrently from from deep storage.|1| -|`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently from local storage at startup.|Same as numLoadingThreads| +|`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently from local storage at startup.|1| ### Query Configs diff --git a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java index f67cd649bfa0..a0d66c1f402e 100644 --- a/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java +++ b/server/src/main/java/io/druid/segment/loading/SegmentLoaderConfig.java @@ -31,6 +31,8 @@ */ public class SegmentLoaderConfig { + private static final int DEFAULT_NUM_BOOTSTRAP_THREADS = 1; + @JsonProperty @NotEmpty private List locations = null; @@ -44,9 +46,6 @@ public class SegmentLoaderConfig @JsonProperty("announceIntervalMillis") private int announceIntervalMillis = 0; // do not background announce - @JsonProperty("numLoadingThreads") - private int numLoadingThreads = 1; - @JsonProperty("numBootstrapThreads") private Integer numBootstrapThreads = null; @@ -73,14 +72,9 @@ public int getAnnounceIntervalMillis() return announceIntervalMillis; } - public int getNumLoadingThreads() - { - return numLoadingThreads; - } - public int getNumBootstrapThreads() { - return numBootstrapThreads == null ? numLoadingThreads : numBootstrapThreads; + return numBootstrapThreads == null ? DEFAULT_NUM_BOOTSTRAP_THREADS : numBootstrapThreads; } public File getInfoDir() diff --git a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java index 4ae8887694eb..0d6fc933f9a7 100644 --- a/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java +++ b/server/src/main/java/io/druid/server/coordination/ZkCoordinator.java @@ -127,7 +127,7 @@ public void start() throws IOException loadQueueLocation, true, true, - Execs.multiThreaded(config.getNumLoadingThreads(), "ZkCoordinator-%s") + Execs.singleThreaded("ZkCoordinator-%s") ); try { diff --git a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java index dc14592b820b..fbd407265280 100644 --- a/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordination/ZkCoordinatorTest.java @@ -202,12 +202,6 @@ public File getInfoDir() return infoDir; } - @Override - public int getNumLoadingThreads() - { - return 5; - } - @Override public int getAnnounceIntervalMillis() { @@ -493,12 +487,6 @@ public File getInfoDir() return infoDir; } - @Override - public int getNumLoadingThreads() - { - return 5; - } - @Override public int getAnnounceIntervalMillis() { From 5ef8fa5981003c45c2703cbeacc832a9eb08b847 Mon Sep 17 00:00:00 2001 From: Kevin Conaway Date: Wed, 27 Sep 2017 10:07:00 -0400 Subject: [PATCH 6/6] Segment handoffs should use the allocated partition number --- .../java/io/druid/segment/realtime/plumber/RealtimePlumber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index d4147cc28986..a08348817e31 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -492,7 +492,7 @@ public void doRun() } ); handoffNotifier.registerSegmentHandoffCallback( - new SegmentDescriptor(sink.getInterval(), sink.getVersion(), config.getShardSpec().getPartitionNum()), + new SegmentDescriptor(sink.getInterval(), sink.getVersion(), identifier.getShardSpec().getPartitionNum()), mergeExecutor, new Runnable() { @Override