From 997e0bd436ad9c56df66bc65707b4ba11bc3df6d Mon Sep 17 00:00:00 2001 From: Amatya Date: Sun, 13 Oct 2024 18:33:14 +0530 Subject: [PATCH] Cache DataSegments on the Overlord --- .../common/actions/SegmentAllocateAction.java | 10 +- .../actions/SegmentAllocationQueue.java | 4 +- .../druid/indexing/overlord/TaskLockbox.java | 11 ++ .../actions/SegmentAllocateActionTest.java | 83 +++++++++ .../druid/timeline/SegmentTimeline.java | 8 + server/pom.xml | 6 + .../IndexerSQLMetadataStorageCoordinator.java | 94 +++++++++-- .../apache/druid/metadata/SegmentCache.java | 157 ++++++++++++++++++ 8 files changed, 358 insertions(+), 15 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/metadata/SegmentCache.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java index 902dad5dd879..b4ca41955482 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java @@ -28,7 +28,6 @@ import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.PendingSegmentAllocatingTask; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.LockRequestForNewSegment; import org.apache.druid.indexing.overlord.LockResult; import org.apache.druid.indexing.overlord.Segments; @@ -37,6 +36,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NumberedPartialShardSpec; @@ -226,7 +226,7 @@ public SegmentIdWithShardSpec perform( throw new IAE("Task dataSource must match action dataSource, [%s] != [%s].", task.getDataSource(), dataSource); } - final IndexerMetadataStorageCoordinator msc = toolbox.getIndexerMetadataStorageCoordinator(); + final IndexerSQLMetadataStorageCoordinator msc = (IndexerSQLMetadataStorageCoordinator) toolbox.getIndexerMetadataStorageCoordinator(); // 1) if something overlaps our timestamp, use that // 2) otherwise try preferredSegmentGranularity & going progressively smaller @@ -234,7 +234,8 @@ public SegmentIdWithShardSpec perform( final Interval rowInterval = queryGranularity.bucket(timestamp).withChronology(ISOChronology.getInstanceUTC()); final Set usedSegmentsForRow = - new HashSet<>(msc.retrieveUsedSegmentsForInterval(dataSource, rowInterval, Segments.ONLY_VISIBLE)); + new HashSet<>(msc.retrieveUsedSegmentsForIntervalsFromCache(dataSource, rowInterval, Segments.ONLY_VISIBLE)); + final SegmentIdWithShardSpec identifier; if (usedSegmentsForRow.isEmpty()) { @@ -251,7 +252,8 @@ public SegmentIdWithShardSpec perform( // and if it's different, repeat. Set newUsedSegmentsForRow = - new HashSet<>(msc.retrieveUsedSegmentsForInterval(dataSource, rowInterval, Segments.ONLY_VISIBLE)); + new HashSet<>(msc.retrieveUsedSegmentsForIntervalsFromCache(dataSource, rowInterval, Segments.ONLY_VISIBLE)); + if (!newUsedSegmentsForRow.equals(usedSegmentsForRow)) { if (attempt < MAX_ATTEMPTS) { final long shortRandomSleep = 50 + (long) (ThreadLocalRandom.current().nextDouble() * 450); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java index 98ab50cff788..da23f0f976c7 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocationQueue.java @@ -38,6 +38,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; +import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; @@ -380,8 +381,9 @@ private boolean processBatch(AllocateRequestBatch requestBatch) private Set retrieveUsedSegments(AllocateRequestKey key) { + IndexerSQLMetadataStorageCoordinator coordinator = (IndexerSQLMetadataStorageCoordinator) metadataStorage; return new HashSet<>( - metadataStorage.retrieveUsedSegmentsForInterval( + coordinator.retrieveUsedSegmentsForIntervalsFromCache( key.dataSource, key.preferredAllocationInterval, Segments.ONLY_VISIBLE diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java index bebb52157d6f..a3d6901d40cd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java @@ -48,6 +48,7 @@ import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.ReplaceTaskLock; import org.apache.druid.query.QueryContexts; @@ -168,6 +169,8 @@ public int compare(Pair left, Pair right) final Set failedToReacquireLockTaskGroups = new HashSet<>(); // Bookkeeping for a log message at the end int taskLockCount = 0; + // To initialize metadata storage coordinator pending segment cache + final Map> datasourceToActiveTaskLocks = new HashMap<>(); for (final Pair taskAndLock : byVersionOrdering.sortedCopy(storedLocks)) { final Task task = Preconditions.checkNotNull(taskAndLock.lhs, "task"); final TaskLock savedTaskLock = Preconditions.checkNotNull(taskAndLock.rhs, "savedTaskLock"); @@ -192,6 +195,8 @@ public int compare(Pair left, Pair right) if (savedTaskLockWithPriority.getVersion().equals(taskLock.getVersion())) { taskLockCount++; + datasourceToActiveTaskLocks.computeIfAbsent(taskLock.getDataSource(), ds -> new HashSet<>()) + .add(taskLock.getInterval()); log.info( "Reacquired lock[%s] for task: %s", taskLock, @@ -199,6 +204,8 @@ public int compare(Pair left, Pair right) ); } else { taskLockCount++; + datasourceToActiveTaskLocks.computeIfAbsent(taskLock.getDataSource(), ds -> new HashSet<>()) + .add(taskLock.getInterval()); log.info( "Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s", savedTaskLockWithPriority.getInterval(), @@ -245,6 +252,10 @@ public int compare(Pair left, Pair right) + "as they failed to reacquire at least one lock.", failedToReacquireLockTaskGroups); } + if (metadataStorageCoordinator instanceof IndexerSQLMetadataStorageCoordinator) { + ((IndexerSQLMetadataStorageCoordinator) metadataStorageCoordinator).initializeCache(datasourceToActiveTaskLocks); + } + return new TaskLockboxSyncResult(tasksToFail); } finally { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java index fdb7fcd5595b..cf0dd4d4e751 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentAllocateActionTest.java @@ -41,6 +41,8 @@ import org.apache.druid.java.util.common.granularity.PeriodGranularity; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator; +import org.apache.druid.metadata.PendingSegmentRecord; import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec; @@ -53,6 +55,8 @@ import org.apache.druid.timeline.partition.ShardSpec; import org.easymock.EasyMock; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.Interval; import org.joda.time.Period; import org.junit.After; import org.junit.Assert; @@ -1168,6 +1172,85 @@ public void testSegmentIdMustNotBeReused() lockbox.unlock(task1, Intervals.ETERNITY); } + //@Test TODO uncomment + public void testBenchmark() throws Exception + { + final Task task = NoopTask.create(); + taskActionTestKit.getTaskLockbox().add(task); + + final Interval startInterval = Intervals.of("2024-01-01/2024-01-02"); + final long dayToMillis = 1000L * 60L * 60L * 24L; + final long numDays = 365; + List intervals = new ArrayList<>(); + for (long day = 0; day < numDays; day++) { + intervals.add( + new Interval( + startInterval.getStartMillis() + dayToMillis * day, + startInterval.getEndMillis() + dayToMillis * day, + DateTimeZone.UTC + ) + ); + } + + final IndexerSQLMetadataStorageCoordinator coordinator = + (IndexerSQLMetadataStorageCoordinator) taskActionTestKit.getMetadataStorageCoordinator(); + final Set segments = new HashSet<>(); + final List pendingSegments = new ArrayList<>(); + final int numUsedSegmentsPerInterval = 2000; + final int numPendingSegmentsPerInterval = 1000; + int version = 0; + for (Interval interval : intervals) { + for (int i = 0; i < numUsedSegmentsPerInterval; i++) { + segments.add(getSegmentForIdentifier( + new SegmentIdWithShardSpec( + DATA_SOURCE, + interval, + "version" + version, + new NumberedShardSpec(i, numUsedSegmentsPerInterval) + ) + )); + } + coordinator.commitSegments(segments, null); + segments.clear(); + for (int i = numUsedSegmentsPerInterval; i < numUsedSegmentsPerInterval + numPendingSegmentsPerInterval; i++) { + pendingSegments.add(new PendingSegmentRecord( + new SegmentIdWithShardSpec( + DATA_SOURCE, + interval, + "version" + version, + new NumberedShardSpec(i, numUsedSegmentsPerInterval) + ), + "sequence" + i, + "random_" + ThreadLocalRandom.current().nextInt(), + null, + null + )); + } + coordinator.insertPendingSegments(DATA_SOURCE, pendingSegments, true); + pendingSegments.clear(); + version = (version + 1) % 10; + System.out.println(interval); + } + + ExecutorService allocatorService = Execs.multiThreaded(4, "allocator-%d"); + + final List> allocateTasks = new ArrayList<>(); + + long startTime = System.nanoTime(); + for (int j = 0; j < numDays; j++) { + for (int i = 0; i < 20; i++) { + final String sequenceId = "sequence" + (numPendingSegmentsPerInterval + numUsedSegmentsPerInterval + i); + final int k = j; + allocateTasks.add(() -> allocateWithoutLineageCheck(task, intervals.get(k).getStart(), Granularities.NONE, Granularities.DAY, sequenceId, TaskLockType.APPEND)); + } + } + for (Future future : allocatorService.invokeAll(allocateTasks)) { + future.get(); + } + long totalMillis = (System.nanoTime() - startTime) / 1_000_000; + System.out.println("Total allocation time for 7300 requests: " + totalMillis + "ms."); + } + private SegmentIdWithShardSpec allocate( final Task task, final DateTime timestamp, diff --git a/processing/src/main/java/org/apache/druid/timeline/SegmentTimeline.java b/processing/src/main/java/org/apache/druid/timeline/SegmentTimeline.java index 83c345eacfd0..2bb92798a951 100644 --- a/processing/src/main/java/org/apache/druid/timeline/SegmentTimeline.java +++ b/processing/src/main/java/org/apache/druid/timeline/SegmentTimeline.java @@ -21,6 +21,7 @@ import com.google.common.collect.Iterators; +import java.util.Collection; import java.util.Comparator; import java.util.Iterator; @@ -60,6 +61,13 @@ public void addSegments(Iterator segments) ); } + public void removeSegments(Collection segments) + { + for (DataSegment segment : segments) { + remove(segment.getInterval(), segment.getVersion(), segment.getShardSpec().createChunk(segment)); + } + } + public boolean isOvershadowed(DataSegment segment) { return isOvershadowed(segment.getInterval(), segment.getVersion(), segment); diff --git a/server/pom.xml b/server/pom.xml index 3300606d1067..672866a5dfb8 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -471,6 +471,12 @@ commons-text test + + org.scala-lang + scala-reflect + 2.13.11 + compile + diff --git a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index a512f7935740..3f10141c903e 100644 --- a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -119,6 +119,7 @@ public class IndexerSQLMetadataStorageCoordinator implements IndexerMetadataStor private final SegmentSchemaManager segmentSchemaManager; private final CentralizedDatasourceSchemaConfig centralizedDatasourceSchemaConfig; private final boolean schemaPersistEnabled; + private final SegmentCache segmentCache = new SegmentCache(); @Inject public IndexerSQLMetadataStorageCoordinator( @@ -151,6 +152,45 @@ public void start() connector.createUpgradeSegmentsTable(); } + public void initializeCache(Map> datasourceToActiveLockIntervals) + { + segmentCache.clear(); + for (String datsource : retrieveAllDatasourceNames()) { + segmentCache.addSegments(datsource, retrieveAllUsedSegments(datsource, Segments.INCLUDING_OVERSHADOWED)); + } + for (final String datasource : datasourceToActiveLockIntervals.keySet()) { + for (final Interval interval : datasourceToActiveLockIntervals.get(datasource)) { + segmentCache.addPendingSegments( + datasource, + getPendingSegments(datasource, interval).stream() + .map(PendingSegmentRecord::getId) + .collect(Collectors.toSet()) + ); + } + } + } + + public Set retrieveUsedSegmentsForIntervalsFromCache( + final String dataSource, + final Interval interval, + final Segments visibility + ) + { + final SegmentTimeline timeline = segmentCache.getDatasourceTimelines().get(dataSource); + if (timeline == null) { + return Collections.emptySet(); + } + Set allSegments = new HashSet<>(); + timeline.lookup(interval) + .forEach(holder -> holder.getObject().payloads().forEach(allSegments::add)); + if (visibility == Segments.INCLUDING_OVERSHADOWED) { + return allSegments; + } else { + return SegmentTimeline.forSegments(allSegments) + .findNonOvershadowedObjectsInInterval(Intervals.ETERNITY, Partitions.ONLY_COMPLETE); + } + } + @Override public Set retrieveUsedSegmentsForIntervals( final String dataSource, @@ -170,6 +210,20 @@ public Set retrieveAllUsedSegments(String dataSource, Segments visi return doRetrieveUsedSegments(dataSource, Collections.emptyList(), visibility); } + private List retrieveAllDatasourceNames() + { + final String query = "SELECT DISTINCT datasource from %s WHERE used = true"; + final Query> sql = connector.retryWithHandle( + handle -> handle.createQuery(StringUtils.format(query, dbTables.getSegmentsTable())) + ); + + final ResultIterator resultIterator = + sql.map((index, r, ctx) -> JacksonUtils.readValue(jsonMapper, r.getBytes(1), String.class)) + .iterator(); + return Lists.newArrayList(resultIterator); + } + + /** * @param intervals empty list means unrestricted interval. */ @@ -530,6 +584,7 @@ public SegmentPublishResult commitSegmentsAndMetadata( usedSegments, segmentSchemaMapping ); + segmentCache.addSegments(dataSource, inserted); return SegmentPublishResult.ok(ImmutableSet.copyOf(inserted)); }, 3, @@ -752,9 +807,9 @@ public SegmentIdWithShardSpec allocatePendingSegment( return connector.retryWithHandle( handle -> { // Get the time chunk and associated data segments for the given interval, if any + final SegmentTimeline timeline = segmentCache.getDatasourceTimelines().get(dataSource); final List> existingChunks = - getTimelineForIntervalsWithHandle(handle, dataSource, ImmutableList.of(interval)) - .lookup(interval); + timeline == null ? Collections.emptyList() : timeline.lookup(interval); if (existingChunks.size() > 1) { // Not possible to expand more than one chunk with a single segment. log.warn( @@ -1012,9 +1067,9 @@ private Map allocatePendingSegment ) throws IOException { // Get the time chunk and associated data segments for the given interval, if any + final SegmentTimeline timeline = segmentCache.getDatasourceTimelines().get(dataSource); final List> existingChunks = - getTimelineForIntervalsWithHandle(handle, dataSource, Collections.singletonList(interval)) - .lookup(interval); + timeline == null ? Collections.emptyList() : timeline.lookup(interval); if (existingChunks.size() > 1) { log.warn( "Cannot allocate new segments for dataSource[%s], interval[%s] as interval already has [%,d] chunks.", @@ -1511,6 +1566,21 @@ private SegmentPublishResult commitAppendSegmentsAndMetadataInTransaction( } } + /** + * Test only method + */ + @VisibleForTesting + public void insertPendingSegments( + String dataSource, + List pendingSegments, + boolean skipSegmentLineageCheck + ) + { + connector.retryWithHandle( + handle -> insertPendingSegmentsIntoMetastore(handle, pendingSegments, dataSource, skipSegmentLineageCheck) + ); + } + @VisibleForTesting int insertPendingSegmentsIntoMetastore( Handle handle, @@ -1556,6 +1626,7 @@ int insertPendingSegmentsIntoMetastore( processedSegmentIds.add(segmentId); } + segmentCache.addPendingSegments(dataSource, processedSegmentIds); int[] updated = insertBatch.execute(); return Arrays.stream(updated).sum(); } @@ -1592,6 +1663,7 @@ private void insertPendingSegmentIntoMetastore( .bind("payload", jsonMapper.writeValueAsBytes(newIdentifier)) .bind("task_allocator_id", taskAllocatorId) .execute(); + segmentCache.addPendingSegments(dataSource, Collections.singleton(newIdentifier)); } private Map createNewSegments( @@ -1643,9 +1715,7 @@ private Map createNewSegments( // A pending segment having a higher partitionId must also be considered // to avoid clashes when inserting the pending segment created here. final Set pendingSegments = new HashSet<>( - getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).stream() - .map(PendingSegmentRecord::getId) - .collect(Collectors.toSet()) + segmentCache.getPendingSegments(dataSource, interval) ); final Map createdSegments = new HashMap<>(); @@ -1851,9 +1921,7 @@ private SegmentIdWithShardSpec createNewSegment( // A pending segment having a higher partitionId must also be considered // to avoid clashes when inserting the pending segment created here. final Set pendings = new HashSet<>( - getPendingSegmentsForIntervalWithHandle(handle, dataSource, interval).stream() - .map(PendingSegmentRecord::getId) - .collect(Collectors.toSet()) + segmentCache.getPendingSegments(dataSource, interval) ); if (committedMaxId != null) { pendings.add(committedMaxId); @@ -2378,6 +2446,12 @@ private Set insertSegments( } } + if (!segmentsToInsert.isEmpty()) { + segmentCache.addSegments( + segmentsToInsert.stream().findFirst().get().getDataSource(), + segmentsToInsert + ); + } return segmentsToInsert; } diff --git a/server/src/main/java/org/apache/druid/metadata/SegmentCache.java b/server/src/main/java/org/apache/druid/metadata/SegmentCache.java new file mode 100644 index 000000000000..99e0b56368f8 --- /dev/null +++ b/server/src/main/java/org/apache/druid/metadata/SegmentCache.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.google.common.collect.ImmutableSet; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentTimeline; +import org.joda.time.Interval; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class SegmentCache +{ + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final Map datasourceTimelines = new HashMap<>(); + private final Set addedSegments = new HashSet<>(); + private final Set removedSegments = new HashSet<>(); + private final Map>> datasourcePendingSegments + = new HashMap<>(); + + public void clear() + { + lock.writeLock().lock(); + try { + datasourceTimelines.clear(); + addedSegments.clear(); + removedSegments.clear(); + datasourcePendingSegments.clear(); + } + finally { + lock.writeLock().unlock(); + } + } + + public void addPendingSegments(String datasource, Collection pendingSegments) + { + lock.writeLock().lock(); + try { + datasourcePendingSegments.putIfAbsent(datasource, new HashMap<>()); + for (SegmentIdWithShardSpec pendingSegment : pendingSegments) { + datasourcePendingSegments.get(datasource) + .computeIfAbsent(pendingSegment.getInterval(), itvl -> new HashSet<>()) + .add(pendingSegment); + } + } + finally { + lock.writeLock().unlock(); + } + } + + public Set getPendingSegments(String datasource, Interval interval) + { + lock.readLock().lock(); + try { + if (!datasourcePendingSegments.containsKey(datasource)) { + return Collections.emptySet(); + } + Set pendingSegments = new HashSet<>(); + for (Interval itvl : datasourcePendingSegments.get(datasource).keySet()) { + if (itvl.overlaps(interval)) { + pendingSegments.addAll(datasourcePendingSegments.get(datasource).get(itvl)); + } + } + return pendingSegments; + } + finally { + lock.readLock().unlock(); + } + } + + public void addSegments(String datasource, Collection segments) + { + lock.writeLock().lock(); + try { + //addedSegments.addAll(segments); + //removedSegments.removeAll(segments); + datasourceTimelines.computeIfAbsent(datasource, ds -> new SegmentTimeline()) + .addSegments(segments.iterator()); + } + finally { + lock.writeLock().unlock(); + } + } + + public void removeSegments(String datasource, Collection segments) + { + lock.writeLock().lock(); + try { + SegmentTimeline timeline = datasourceTimelines.get(datasource); + if (timeline == null) { + return; + } + timeline.removeSegments(segments); + if (timeline.isEmpty()) { + datasourceTimelines.remove(datasource); + } + removedSegments.addAll(segments); + addedSegments.removeAll(segments); + } + finally { + lock.writeLock().unlock(); + } + } + + public Map getDatasourceTimelines() + { + lock.readLock().lock(); + try { + return datasourceTimelines; + } + finally { + lock.readLock().unlock(); + } + } + + public Pair, Set> getChangeSetAndReset() + { + lock.writeLock().lock(); + try { + Pair, Set> changeSet = new Pair<>( + ImmutableSet.copyOf(addedSegments), + ImmutableSet.copyOf(removedSegments) + ); + addedSegments.clear(); + removedSegments.clear(); + return changeSet; + } + finally { + lock.writeLock().unlock(); + } + } +}