diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index 808cda53b948..d9dcef85bea1 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -46,6 +46,7 @@ import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireHydrant; +import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.segment.realtime.plumber.Sink; @@ -93,6 +94,7 @@ public YeOldePlumberSchool( @Override public Plumber findPlumber( + final SegmentAllocator segmentAllocator, final DataSchema schema, final RealtimeTuningConfig config, final FireDepartmentMetrics metrics @@ -123,7 +125,7 @@ public Object startJob() } @Override - public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException + public int add(InputRow row, String sequenceName, Supplier committerSupplier) throws IndexSizeExceededException { Sink sink = getSink(row.getTimestampFromEpoch()); if (sink == null) { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index fe2a92f2b9db..ec7ec1effd8f 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -34,7 +34,7 @@ import io.druid.discovery.DiscoveryDruidNode; import io.druid.discovery.DruidNodeDiscoveryProvider; import io.druid.discovery.LookupNodeService; -import io.druid.indexing.common.TaskLock; +import io.druid.indexing.appenderator.ActionBasedSegmentAllocator; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; import io.druid.indexing.common.actions.LockAcquireAction; @@ -57,6 +57,7 @@ import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.firehose.ClippedFirehoseFactory; import io.druid.segment.realtime.firehose.EventReceiverFirehoseFactory; import io.druid.segment.realtime.firehose.TimedShutoffFirehoseFactory; @@ -65,12 +66,10 @@ import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.segment.realtime.plumber.Plumbers; import io.druid.segment.realtime.plumber.RealtimePlumberSchool; -import io.druid.segment.realtime.plumber.VersioningPolicy; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; import org.apache.commons.io.FileUtils; import org.joda.time.DateTime; -import org.joda.time.Interval; import java.io.File; import java.io.IOException; @@ -261,35 +260,10 @@ public void unannounceSegments(Iterable segments) throws IOExceptio } }; - // NOTE: getVersion will block if there is lock contention, which will block plumber.getSink - // NOTE: (and thus the firehose) - - // Shouldn't usually happen, since we don't expect people to submit tasks that intersect with the - // realtime window, but if they do it can be problematic. If we decide to care, we can use more threads in - // the plumber such that waiting for the coordinator doesn't block data processing. - final VersioningPolicy versioningPolicy = new VersioningPolicy() - { - @Override - public String getVersion(final Interval interval) - { - try { - // Side effect: Calling getVersion causes a lock to be acquired - final TaskLock myLock = toolbox.getTaskActionClient() - .submit(new LockAcquireAction(interval, lockTimeoutMs)); - - return myLock.getVersion(); - } - catch (IOException e) { - throw Throwables.propagate(e); - } - } - }; - DataSchema dataSchema = spec.getDataSchema(); RealtimeIOConfig realtimeIOConfig = spec.getIOConfig(); RealtimeTuningConfig tuningConfig = spec.getTuningConfig() - .withBasePersistDirectory(toolbox.getPersistDir()) - .withVersioningPolicy(versioningPolicy); + .withBasePersistDirectory(toolbox.getPersistDir()); final FireDepartment fireDepartment = new FireDepartment( dataSchema, @@ -305,6 +279,8 @@ public String getVersion(final Interval interval) ); this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate(); + final SegmentAllocator segmentAllocator = createSegmentAllocator(toolbox); + // NOTE: This pusher selects path based purely on global configuration and the DataSegment, which means // NOTE: that redundant realtime tasks will upload to the same location. This can cause index.zip // NOTE: (partitionNum_index.zip for HDFS data storage) and descriptor.json (partitionNum_descriptor.json for @@ -325,7 +301,7 @@ public String getVersion(final Interval interval) toolbox.getObjectMapper() ); - this.plumber = plumberSchool.findPlumber(dataSchema, tuningConfig, metrics); + this.plumber = plumberSchool.findPlumber(segmentAllocator, dataSchema, tuningConfig, metrics); Supplier committerSupplier = null; final File firehoseTempDir = toolbox.getFirehoseTemporaryDir(); @@ -367,10 +343,13 @@ public String getVersion(final Interval interval) } } + final String sequenceName = getId(); + // Time to read data! while (firehose != null && (!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) { Plumbers.addNextRow( committerSupplier, + sequenceName, firehose, plumber, tuningConfig.isReportParseExceptions(), @@ -532,6 +511,11 @@ && isFirehoseDrainableByClosing(((TimedShutoffFirehoseFactory) firehoseFactory). && isFirehoseDrainableByClosing(((ClippedFirehoseFactory) firehoseFactory).getDelegate())); } + protected SegmentAllocator createSegmentAllocator(TaskToolbox toolbox) + { + return new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), spec.getDataSchema()); + } + public static class TaskActionSegmentPublisher implements SegmentPublisher { final Task task; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java index 77a0f0fa93e2..688a64523628 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTask.java @@ -32,6 +32,7 @@ import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; @@ -63,7 +64,7 @@ public TestRealtimeTask( { @Override public Plumber findPlumber( - DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + SegmentAllocator segmentAllocator, DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics ) { return null; diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index c40d8e30ec9c..d1c8a4bbbb88 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -27,7 +27,6 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -59,15 +58,15 @@ import io.druid.indexing.common.actions.TaskActionToolbox; import io.druid.indexing.common.config.TaskConfig; import io.druid.indexing.common.config.TaskStorageConfig; +import io.druid.indexing.overlord.DataSourceMetadata; import io.druid.indexing.overlord.HeapMemoryTaskStorage; -import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import io.druid.indexing.overlord.SegmentPublishResult; import io.druid.indexing.overlord.TaskLockbox; import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.supervisor.SupervisorManager; import io.druid.indexing.test.TestDataSegmentAnnouncer; import io.druid.indexing.test.TestDataSegmentKiller; import io.druid.indexing.test.TestDataSegmentPusher; -import io.druid.indexing.test.TestIndexerMetadataStorageCoordinator; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; @@ -77,6 +76,8 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.parsers.ParseException; import io.druid.metadata.EntryExistsException; +import io.druid.metadata.IndexerSQLMetadataStorageCoordinator; +import io.druid.metadata.TestDerbyConnector; import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.Druids; import io.druid.query.IntervalChunkingQueryRunnerDecorator; @@ -100,6 +101,7 @@ import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.SegmentLoaderConfig; import io.druid.segment.loading.SegmentLoaderLocalCacheManager; @@ -112,6 +114,7 @@ import io.druid.server.coordination.DataSegmentServerAnnouncer; import io.druid.server.coordination.ServerType; import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.LinearShardSpec; import org.easymock.EasyMock; import org.hamcrest.CoreMatchers; import org.joda.time.DateTime; @@ -131,9 +134,12 @@ import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -237,9 +243,13 @@ public Firehose connect(InputRowParser parser, File temporaryDirectory) throws I @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); + @Rule + public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule(); + private DateTime now; private ListeningExecutorService taskExec; private Map> handOffCallbacks; + private Collection publishedSegments = new CopyOnWriteArrayList<>(); @Before public void setUp() @@ -248,6 +258,12 @@ public void setUp() emitter.start(); taskExec = MoreExecutors.listeningDecorator(Execs.singleThreaded("realtime-index-task-test-%d")); now = DateTimes.nowUtc(); + + TestDerbyConnector derbyConnector = derbyConnectorRule.getConnector(); + derbyConnector.createDataSourceTable(); + derbyConnector.createTaskTables(); + derbyConnector.createSegmentTable(); + derbyConnector.createPendingSegmentsTable(); } @After @@ -276,9 +292,8 @@ public void testDefaultResource() throws Exception @Test(timeout = 60_000L, expected = ExecutionException.class) public void testHandoffTimeout() throws Exception { - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); final RealtimeIndexTask task = makeRealtimeTask(null, true, 100L); - final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); + final TaskToolbox taskToolbox = makeToolbox(task, tempFolder.newFolder()); final ListenableFuture statusFuture = runTask(task, taskToolbox); // Wait for firehose to show up, it starts off null. @@ -302,12 +317,12 @@ public void testHandoffTimeout() throws Exception firehose.close(); // Wait for publish. - while (mdc.getPublished().isEmpty()) { + while (publishedSegments.isEmpty()) { Thread.sleep(50); } Assert.assertEquals(1, task.getMetrics().processed()); - Assert.assertNotNull(Iterables.getOnlyElement(mdc.getPublished())); + Assert.assertNotNull(Iterables.getOnlyElement(publishedSegments)); // handoff would timeout, resulting in exception @@ -317,9 +332,9 @@ public void testHandoffTimeout() throws Exception @Test(timeout = 60_000L) public void testBasics() throws Exception { - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); + final RealtimeIndexTask task = makeRealtimeTask(null); - final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); + final TaskToolbox taskToolbox = makeToolbox(task, tempFolder.newFolder()); final ListenableFuture statusFuture = runTask(task, taskToolbox); final DataSegment publishedSegment; @@ -354,11 +369,11 @@ public void testBasics() throws Exception firehose.close(); // Wait for publish. - while (mdc.getPublished().isEmpty()) { + while (publishedSegments.isEmpty()) { Thread.sleep(50); } - publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); + publishedSegment = Iterables.getOnlyElement(publishedSegments); // Check metrics. Assert.assertEquals(2, task.getMetrics().processed()); @@ -392,9 +407,8 @@ public void testBasics() throws Exception @Test(timeout = 60_000L) public void testReportParseExceptionsOnBadMetric() throws Exception { - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); final RealtimeIndexTask task = makeRealtimeTask(null, true); - final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); + final TaskToolbox taskToolbox = makeToolbox(task, tempFolder.newFolder()); final ListenableFuture statusFuture = runTask(task, taskToolbox); // Wait for firehose to show up, it starts off null. @@ -458,9 +472,8 @@ public void testReportParseExceptionsOnBadMetric() throws Exception @Test(timeout = 60_000L) public void testNoReportParseExceptions() throws Exception { - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); final RealtimeIndexTask task = makeRealtimeTask(null, false); - final TaskToolbox taskToolbox = makeToolbox(task, mdc, tempFolder.newFolder()); + final TaskToolbox taskToolbox = makeToolbox(task, tempFolder.newFolder()); final ListenableFuture statusFuture = runTask(task, taskToolbox); final DataSegment publishedSegment; @@ -512,11 +525,11 @@ public void testNoReportParseExceptions() throws Exception firehose.close(); // Wait for publish. - while (mdc.getPublished().isEmpty()) { + while (publishedSegments.isEmpty()) { Thread.sleep(50); } - publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); + publishedSegment = Iterables.getOnlyElement(publishedSegments); // Check metrics. Assert.assertEquals(3, task.getMetrics().processed()); @@ -556,8 +569,7 @@ public void testRestore() throws Exception // First run: { - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); - final TaskToolbox taskToolbox = makeToolbox(task1, mdc, directory); + final TaskToolbox taskToolbox = makeToolbox(task1, directory); final ListenableFuture statusFuture = runTask(task1, taskToolbox); // Wait for firehose to show up, it starts off null. @@ -585,14 +597,13 @@ public void testRestore() throws Exception Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode()); // Nothing should be published. - Assert.assertEquals(Sets.newHashSet(), mdc.getPublished()); + Assert.assertTrue(publishedSegments.isEmpty()); } // Second run: { - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); final RealtimeIndexTask task2 = makeRealtimeTask(task1.getId()); - final TaskToolbox taskToolbox = makeToolbox(task2, mdc, directory); + final TaskToolbox taskToolbox = makeToolbox(task2, directory); final ListenableFuture statusFuture = runTask(task2, taskToolbox); // Wait for firehose to show up, it starts off null. @@ -619,11 +630,11 @@ public void testRestore() throws Exception firehose.close(); // Wait for publish. - while (mdc.getPublished().isEmpty()) { + while (publishedSegments.isEmpty()) { Thread.sleep(50); } - publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); + publishedSegment = Iterables.getOnlyElement(publishedSegments); // Do a query. Assert.assertEquals(2, sumMetric(task2, "rows")); @@ -653,14 +664,13 @@ public void testRestore() throws Exception public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception { final TaskStorage taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null)); - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); final File directory = tempFolder.newFolder(); final RealtimeIndexTask task1 = makeRealtimeTask(null); final DataSegment publishedSegment; // First run: { - final TaskToolbox taskToolbox = makeToolbox(task1, taskStorage, mdc, directory); + final TaskToolbox taskToolbox = makeToolbox(task1, taskStorage, directory); final ListenableFuture statusFuture = runTask(task1, taskToolbox); // Wait for firehose to show up, it starts off null. @@ -684,11 +694,11 @@ public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception firehose.close(); // Wait for publish. - while (mdc.getPublished().isEmpty()) { + while (publishedSegments.isEmpty()) { Thread.sleep(50); } - publishedSegment = Iterables.getOnlyElement(mdc.getPublished()); + publishedSegment = Iterables.getOnlyElement(publishedSegments); // Do a query. Assert.assertEquals(1, sumMetric(task1, "rows")); @@ -705,7 +715,7 @@ public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception // Second run: { final RealtimeIndexTask task2 = makeRealtimeTask(task1.getId()); - final TaskToolbox taskToolbox = makeToolbox(task2, taskStorage, mdc, directory); + final TaskToolbox taskToolbox = makeToolbox(task2, taskStorage, directory); final ListenableFuture statusFuture = runTask(task2, taskToolbox); // Wait for firehose to show up, it starts off null. @@ -720,7 +730,7 @@ public void testRestoreAfterHandoffAttemptDuringShutdown() throws Exception firehose.close(); // publishedSegment is still published. No reason it shouldn't be. - Assert.assertEquals(ImmutableSet.of(publishedSegment), mdc.getPublished()); + Assert.assertEquals(ImmutableSet.of(publishedSegment), ImmutableSet.copyOf(publishedSegments)); // Wait for a handoffCallback to show up. while (handOffCallbacks.isEmpty()) { @@ -756,8 +766,7 @@ public void testRestoreCorruptData() throws Exception // First run: { - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); - final TaskToolbox taskToolbox = makeToolbox(task1, mdc, directory); + final TaskToolbox taskToolbox = makeToolbox(task1, directory); final ListenableFuture statusFuture = runTask(task1, taskToolbox); // Wait for firehose to show up, it starts off null. @@ -785,7 +794,7 @@ public void testRestoreCorruptData() throws Exception Assert.assertEquals(TaskStatus.Status.SUCCESS, taskStatus.getStatusCode()); // Nothing should be published. - Assert.assertEquals(Sets.newHashSet(), mdc.getPublished()); + Assert.assertTrue(publishedSegments.isEmpty()); } // Corrupt the data: @@ -804,9 +813,8 @@ public void testRestoreCorruptData() throws Exception // Second run: { - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); final RealtimeIndexTask task2 = makeRealtimeTask(task1.getId()); - final TaskToolbox taskToolbox = makeToolbox(task2, mdc, directory); + final TaskToolbox taskToolbox = makeToolbox(task2, directory); final ListenableFuture statusFuture = runTask(task2, taskToolbox); // Wait for the task to finish. @@ -828,8 +836,7 @@ public void testStopBeforeStarting() throws Exception final RealtimeIndexTask task1 = makeRealtimeTask(null); task1.stopGracefully(); - final TestIndexerMetadataStorageCoordinator mdc = new TestIndexerMetadataStorageCoordinator(); - final TaskToolbox taskToolbox = makeToolbox(task1, mdc, directory); + final TaskToolbox taskToolbox = makeToolbox(task1, directory); final ListenableFuture statusFuture = runTask(task1, taskToolbox); // Wait for the task to finish. @@ -874,11 +881,12 @@ private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportPa private RealtimeIndexTask makeRealtimeTask(final String taskId, boolean reportParseExceptions, long handoffTimeout) { ObjectMapper objectMapper = new DefaultObjectMapper(); + GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null); DataSchema dataSchema = new DataSchema( "test_ds", null, new AggregatorFactory[]{new CountAggregatorFactory("rows"), new LongSumAggregatorFactory("met1", "met1")}, - new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), + granularitySpec, objectMapper ); RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig( @@ -920,14 +928,12 @@ protected boolean isFirehoseDrainableByClosing(FirehoseFactory firehoseFactory) private TaskToolbox makeToolbox( final Task task, - final IndexerMetadataStorageCoordinator mdc, final File directory ) { return makeToolbox( task, new HeapMemoryTaskStorage(new TaskStorageConfig(null)), - mdc, directory ); } @@ -935,10 +941,40 @@ private TaskToolbox makeToolbox( private TaskToolbox makeToolbox( final Task task, final TaskStorage taskStorage, - final IndexerMetadataStorageCoordinator mdc, final File directory ) { + ObjectMapper mapper = new DefaultObjectMapper(); + mapper.registerSubtypes(LinearShardSpec.class); + IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator( + mapper, + derbyConnectorRule.metadataTablesConfigSupplier().get(), + derbyConnectorRule.getConnector() + ) + { + @Override + public Set announceHistoricalSegments(Set segments) throws IOException + { + Set result = super.announceHistoricalSegments(segments); + + publishedSegments.addAll(result); + + return result; + } + + @Override + public SegmentPublishResult announceHistoricalSegments( + Set segments, DataSourceMetadata startMetadata, DataSourceMetadata endMetadata + ) throws IOException + { + SegmentPublishResult result = super.announceHistoricalSegments(segments, startMetadata, endMetadata); + + publishedSegments.addAll(result.getSegments()); + + return result; + } + }; + final TaskConfig taskConfig = new TaskConfig(directory.getPath(), null, null, 50000, null, false, null, null); final TaskLockbox taskLockbox = new TaskLockbox(taskStorage); try { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 02dfdbbd050a..85d3bd9b048c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -48,6 +48,7 @@ import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; @@ -476,7 +477,7 @@ public void testRealtimeIndexTaskSerde() throws Exception { @Override public Plumber findPlumber( - DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + SegmentAllocator segmentAllocator, DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics ) { return null; diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 8d38b8abc10c..763ac70c52be 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -95,6 +95,7 @@ import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.DataSegmentMover; @@ -107,6 +108,8 @@ import io.druid.segment.loading.StorageLocationConfig; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartmentTest; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.TestSegmentAllocator; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; import io.druid.server.DruidNode; @@ -119,6 +122,7 @@ import io.druid.timeline.partition.NoneShardSpec; import org.easymock.EasyMock; import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; import org.joda.time.Hours; import org.joda.time.Interval; import org.joda.time.Period; @@ -1182,11 +1186,12 @@ private TaskStatus runTask(final Task task) throws Exception private RealtimeIndexTask newRealtimeIndexTask() { String taskId = StringUtils.format("rt_task_%s", System.currentTimeMillis()); + GranularitySpec granularitySpec = new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null); DataSchema dataSchema = new DataSchema( "test_ds", null, new AggregatorFactory[]{new LongSumAggregatorFactory("count", "rows")}, - new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), + granularitySpec, mapper ); RealtimeIOConfig realtimeIOConfig = new RealtimeIOConfig( @@ -1218,6 +1223,13 @@ private RealtimeIndexTask newRealtimeIndexTask() new TaskResource(taskId, 1), fireDepartment, null - ); + ) { + @Override + protected SegmentAllocator createSegmentAllocator(TaskToolbox toolbox) + { + String version = DateTime.now(DateTimeZone.UTC).toString(); + return new TestSegmentAllocator(dataSchema.getDataSource(), version, granularitySpec.getSegmentGranularity()); + } + }; } } diff --git a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java index 1d160e99bff6..b0c9698ea826 100644 --- a/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/worker/TaskAnnouncementTest.java @@ -33,6 +33,7 @@ import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; @@ -64,7 +65,7 @@ public void testBackwardsCompatibleSerde() throws Exception { @Override public Plumber findPlumber( - DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + SegmentAllocator segmentAllocator, DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics ) { return null; diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index 5338055a4d8b..795044ced3bc 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -240,27 +240,6 @@ public long getAlertTimeout() return alertTimeout; } - public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) - { - return new RealtimeTuningConfig( - maxRowsInMemory, - intermediatePersistPeriod, - windowPeriod, - basePersistDirectory, - policy, - rejectionPolicyFactory, - maxPendingPersists, - shardSpec, - indexSpec, - true, - persistThreadPriority, - mergeThreadPriority, - reportParseExceptions, - handoffConditionTimeout, - alertTimeout - ); - } - public RealtimeTuningConfig withBasePersistDirectory(File dir) { return new RealtimeTuningConfig( diff --git a/server/src/main/java/io/druid/segment/realtime/FireDepartment.java b/server/src/main/java/io/druid/segment/realtime/FireDepartment.java index eca78928eccf..fb2c35ff8903 100644 --- a/server/src/main/java/io/druid/segment/realtime/FireDepartment.java +++ b/server/src/main/java/io/druid/segment/realtime/FireDepartment.java @@ -28,6 +28,8 @@ import io.druid.segment.indexing.IngestionSpec; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SpecBasedSegmentAllocator; import io.druid.segment.realtime.plumber.Plumber; import java.io.IOException; @@ -91,7 +93,8 @@ public RealtimeTuningConfig getTuningConfig() public Plumber findPlumber() { - return ioConfig.getPlumberSchool().findPlumber(dataSchema, tuningConfig, metrics); + final SegmentAllocator segmentAllocator = new SpecBasedSegmentAllocator(dataSchema, tuningConfig); + return ioConfig.getPlumberSchool().findPlumber(segmentAllocator, dataSchema, tuningConfig, metrics); } public boolean checkFirehoseV2() diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index fd30f61b29b5..70d1dbbb874e 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -338,6 +338,7 @@ private boolean runFirehoseV2(FirehoseV2 firehose) throws Exception log.info("FirehoseV2 started"); final Supplier committerSupplier = Committers.supplierFromFirehoseV2(firehose); + final String sequenceName = generateSequenceName(); boolean haveRow = true; while (haveRow) { if (Thread.interrupted() || stopping) { @@ -348,7 +349,7 @@ private boolean runFirehoseV2(FirehoseV2 firehose) throws Exception try { inputRow = firehose.currRow(); if (inputRow != null) { - numRows = plumber.add(inputRow, committerSupplier); + numRows = plumber.add(inputRow, sequenceName, committerSupplier); if (numRows < 0) { metrics.incrementThrownAway(); log.debug("Throwing away event[%s]", inputRow); @@ -379,15 +380,21 @@ private boolean runFirehoseV2(FirehoseV2 firehose) throws Exception private boolean runFirehose(Firehose firehose) { final Supplier committerSupplier = Committers.supplierFromFirehose(firehose); + final String sequenceName = generateSequenceName(); while (firehose.hasMore()) { if (Thread.interrupted() || stopping) { return false; } - Plumbers.addNextRow(committerSupplier, firehose, plumber, config.isReportParseExceptions(), metrics); + Plumbers.addNextRow(committerSupplier, sequenceName, firehose, plumber, config.isReportParseExceptions(), metrics); } return true; } + private String generateSequenceName() + { + return "index_" + fireDepartment.getDataSchema().getDataSource() + "_" + config.getShardSpec().getPartitionNum(); + } + public QueryRunner getQueryRunner(Query query) { QueryRunnerFactory> factory = conglomerate.findFactory(query); diff --git a/server/src/main/java/io/druid/segment/realtime/SegmentTracker.java b/server/src/main/java/io/druid/segment/realtime/SegmentTracker.java new file mode 100644 index 000000000000..3494eed093a7 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/SegmentTracker.java @@ -0,0 +1,246 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import io.druid.data.input.InputRow; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.logger.Logger; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; +import org.joda.time.DateTime; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.TreeMap; +import java.util.stream.Collectors; + +/** + * Allocates and tracks actively used segments. + */ +public class SegmentTracker +{ + private static final Logger log = new Logger(SegmentTracker.class); + + // All access to "activeSegments" and "lastSegmentId" must be synchronized on "activeSegments". + + // sequenceName -> start of segment interval -> segment we're currently adding data to + private final Map> activeSegments = new TreeMap<>(); + + // sequenceName -> list of identifiers of segments waiting for being published + // publishPendingSegments is always a super set of activeSegments because there can be some segments to which data + // are not added anymore, but not published yet. + private final Map> publishPendingSegments = new HashMap<>(); + + // sequenceName -> most recently allocated segment + private final Map lastSegmentIds = Maps.newHashMap(); + + private final SegmentAllocator segmentAllocator; + + public SegmentTracker(SegmentAllocator segmentAllocator) + { + this.segmentAllocator = segmentAllocator; + } + + @VisibleForTesting + public Map> getActiveSegments() + { + return activeSegments; + } + + @VisibleForTesting + public Map> getPublishPendingSegments() + { + return publishPendingSegments; + } + + public void clear() + { + synchronized (activeSegments) { + activeSegments.clear(); + } + } + + public List removePublished() + { + synchronized (activeSegments) { + return removePublished( + ImmutableList.copyOf(publishPendingSegments.keySet()) + ); + } + } + + public List removePublished(Collection sequenceNames) + { + synchronized (activeSegments) { + final List segments = sequenceNames.stream() + .map(publishPendingSegments::remove) + .filter(Objects::nonNull) + .flatMap(Collection::stream) + .collect(Collectors.toList()); + sequenceNames.forEach(activeSegments::remove); + + return segments; + } + } + + public SegmentTrackerMetadata wrapMetadata(Object callerMetadata) + { + synchronized (activeSegments) { + return new SegmentTrackerMetadata( + ImmutableMap.copyOf( + Maps.transformValues( + activeSegments, + new Function, List>() + { + @Override + public List apply(NavigableMap input) + { + return ImmutableList.copyOf(input.values()); + } + } + ) + ), + ImmutableMap.copyOf(publishPendingSegments), + ImmutableMap.copyOf(lastSegmentIds), + callerMetadata + ); + } + } + + public void restoreFromMetadata(SegmentTrackerMetadata metadata) + { + synchronized (activeSegments) { + for (Map.Entry> entry : metadata.getActiveSegments().entrySet()) { + final String sequenceName = entry.getKey(); + final TreeMap segmentMap = Maps.newTreeMap(); + + activeSegments.put(sequenceName, segmentMap); + + for (SegmentIdentifier identifier : entry.getValue()) { + segmentMap.put(identifier.getInterval().getStartMillis(), identifier); + } + } + publishPendingSegments.putAll(metadata.getPublishPendingSegments()); + lastSegmentIds.putAll(metadata.getLastSegmentIds()); + } + } + + /** + * Return a segment usable for "timestamp". May return null if no segment can be allocated. + * + * @param row Input row + * @param sequenceName sequenceName for potential segment allocation + * + * @return identifier, or null + * + * @throws IOException if an exception occurs while allocating a segment + */ + public SegmentIdentifier getSegment(final InputRow row, final String sequenceName) throws IOException + { + synchronized (activeSegments) { + final DateTime timestamp = row.getTimestamp(); + final SegmentIdentifier existing = getActiveSegment(timestamp, sequenceName); + if (existing != null) { + return existing; + } else { + // Allocate new segment. + final SegmentIdentifier newSegment = segmentAllocator.allocate( + row, + sequenceName, + lastSegmentIds.get(sequenceName) + ); + + + if (newSegment != null) { + log.info("New segment[%s] for sequenceName[%s].", newSegment, sequenceName); + addSegment(sequenceName, newSegment); + } else { + // Well, we tried. + log.warn("Cannot allocate segment for timestamp[%s], sequenceName[%s]. ", timestamp, sequenceName); + } + + return newSegment; + } + } + } + + /** + * Move a set of identifiers out from "active", making way for newer segments. + */ + public void moveSegmentOut(final String sequenceName, final List identifiers) + { + synchronized (activeSegments) { + final NavigableMap activeSegmentsForSequence = activeSegments.get(sequenceName); + if (activeSegmentsForSequence == null) { + throw new ISE("WTF?! Asked to remove segments for sequenceName[%s] which doesn't exist...", sequenceName); + } + + for (final SegmentIdentifier identifier : identifiers) { + log.info("Moving segment[%s] out of active list.", identifier); + final long key = identifier.getInterval().getStartMillis(); + if (!activeSegmentsForSequence.remove(key).equals(identifier)) { + throw new ISE("WTF?! Asked to remove segment[%s] that didn't exist...", identifier); + } + } + } + } + + private SegmentIdentifier getActiveSegment(final DateTime timestamp, final String sequenceName) + { + synchronized (activeSegments) { + final NavigableMap activeSegmentsForSequence = activeSegments.get(sequenceName); + + if (activeSegmentsForSequence == null) { + return null; + } + + final Map.Entry candidateEntry = activeSegmentsForSequence.floorEntry(timestamp.getMillis()); + if (candidateEntry != null && candidateEntry.getValue().getInterval().contains(timestamp)) { + return candidateEntry.getValue(); + } else { + return null; + } + } + } + + private void addSegment(String sequenceName, SegmentIdentifier identifier) + { + synchronized (activeSegments) { + activeSegments.computeIfAbsent(sequenceName, k -> new TreeMap<>()) + .putIfAbsent(identifier.getInterval().getStartMillis(), identifier); + + publishPendingSegments.computeIfAbsent(sequenceName, k -> new ArrayList<>()) + .add(identifier); + lastSegmentIds.put(sequenceName, identifier.getIdentifierAsString()); + } + } + +} diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java b/server/src/main/java/io/druid/segment/realtime/SegmentTrackerMetadata.java similarity index 92% rename from server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java rename to server/src/main/java/io/druid/segment/realtime/SegmentTrackerMetadata.java index edff72572d9e..b22210ffc00a 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriverMetadata.java +++ b/server/src/main/java/io/druid/segment/realtime/SegmentTrackerMetadata.java @@ -17,15 +17,16 @@ * under the License. */ -package io.druid.segment.realtime.appenderator; +package io.druid.segment.realtime; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; import java.util.List; import java.util.Map; -public class AppenderatorDriverMetadata +public class SegmentTrackerMetadata { private final Map> activeSegments; private final Map> publishPendingSegments; @@ -33,7 +34,7 @@ public class AppenderatorDriverMetadata private final Object callerMetadata; @JsonCreator - public AppenderatorDriverMetadata( + public SegmentTrackerMetadata( @JsonProperty("activeSegments") Map> activeSegments, @JsonProperty("publishPendingSegments") Map> publishPendingSegments, @JsonProperty("lastSegmentIds") Map lastSegmentIds, @@ -73,7 +74,7 @@ public Object getCallerMetadata() @Override public String toString() { - return "AppenderatorDriverMetadata{" + + return "SegmentTrackerMetadata{" + "activeSegments=" + activeSegments + ", publishPendingSegments=" + publishPendingSegments + ", lastSegmentIds=" + lastSegmentIds + diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java index f003c01c5bf6..bd5d3d150b0d 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorDriver.java @@ -26,10 +26,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; @@ -44,21 +41,16 @@ import io.druid.java.util.common.logger.Logger; import io.druid.query.SegmentDescriptor; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.SegmentTracker; +import io.druid.segment.realtime.SegmentTrackerMetadata; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory; -import org.joda.time.DateTime; import java.io.Closeable; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.Objects; import java.util.Set; -import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -78,26 +70,11 @@ public class AppenderatorDriver implements Closeable private static final Logger log = new Logger(AppenderatorDriver.class); private final Appenderator appenderator; - private final SegmentAllocator segmentAllocator; + private final SegmentTracker segmentTracker; private final SegmentHandoffNotifier handoffNotifier; private final UsedSegmentChecker usedSegmentChecker; private final ObjectMapper objectMapper; private final FireDepartmentMetrics metrics; - - // All access to "activeSegments", "publishPendingSegments", and "lastSegmentId" must be synchronized on - // "activeSegments". - - // sequenceName -> start of segment interval -> segment we're currently adding data to - private final Map> activeSegments = new TreeMap<>(); - - // sequenceName -> list of identifiers of segments waiting for being published - // publishPendingSegments is always a super set of activeSegments because there can be some segments to which data - // are not added anymore, but not published yet. - private final Map> publishPendingSegments = new HashMap<>(); - - // sequenceName -> most recently allocated segment - private final Map lastSegmentIds = Maps.newHashMap(); - private final ListeningExecutorService publishExecutor; /** @@ -120,7 +97,9 @@ public AppenderatorDriver( ) { this.appenderator = Preconditions.checkNotNull(appenderator, "appenderator"); - this.segmentAllocator = Preconditions.checkNotNull(segmentAllocator, "segmentAllocator"); + this.segmentTracker = new SegmentTracker( + Preconditions.checkNotNull(segmentAllocator, "segmentAllocator") + ); this.handoffNotifier = Preconditions.checkNotNull(handoffNotifierFactory, "handoffNotifierFactory") .createSegmentHandoffNotifier(appenderator.getDataSource()); this.usedSegmentChecker = Preconditions.checkNotNull(usedSegmentChecker, "usedSegmentChecker"); @@ -130,15 +109,9 @@ public AppenderatorDriver( } @VisibleForTesting - Map> getActiveSegments() + SegmentTracker getSegmentTracker() { - return activeSegments; - } - - @VisibleForTesting - Map> getPublishPendingSegments() - { - return publishPendingSegments; + return segmentTracker; } /** @@ -153,28 +126,15 @@ public Object startJob() { handoffNotifier.start(); - final AppenderatorDriverMetadata metadata = objectMapper.convertValue( + final SegmentTrackerMetadata metadata = objectMapper.convertValue( appenderator.startJob(), - AppenderatorDriverMetadata.class + SegmentTrackerMetadata.class ); log.info("Restored metadata[%s].", metadata); if (metadata != null) { - synchronized (activeSegments) { - for (Map.Entry> entry : metadata.getActiveSegments().entrySet()) { - final String sequenceName = entry.getKey(); - final TreeMap segmentMap = Maps.newTreeMap(); - - activeSegments.put(sequenceName, segmentMap); - - for (SegmentIdentifier identifier : entry.getValue()) { - segmentMap.put(identifier.getInterval().getStartMillis(), identifier); - } - } - publishPendingSegments.putAll(metadata.getPublishPendingSegments()); - lastSegmentIds.putAll(metadata.getLastSegmentIds()); - } + segmentTracker.restoreFromMetadata(metadata); return metadata.getCallerMetadata(); } else { @@ -182,26 +142,12 @@ public Object startJob() } } - private void addSegment(String sequenceName, SegmentIdentifier identifier) - { - synchronized (activeSegments) { - activeSegments.computeIfAbsent(sequenceName, k -> new TreeMap<>()) - .putIfAbsent(identifier.getInterval().getStartMillis(), identifier); - - publishPendingSegments.computeIfAbsent(sequenceName, k -> new ArrayList<>()) - .add(identifier); - lastSegmentIds.put(sequenceName, identifier.getIdentifierAsString()); - } - } - /** * Clears out all our state and also calls {@link Appenderator#clear()} on the underlying Appenderator. */ public void clear() throws InterruptedException { - synchronized (activeSegments) { - activeSegments.clear(); - } + segmentTracker.clear(); appenderator.clear(); } @@ -226,7 +172,7 @@ public AppenderatorDriverAddResult add( Preconditions.checkNotNull(sequenceName, "sequenceName"); Preconditions.checkNotNull(committerSupplier, "committerSupplier"); - final SegmentIdentifier identifier = getSegment(row, sequenceName); + final SegmentIdentifier identifier = segmentTracker.getSegment(row, sequenceName); if (identifier != null) { try { @@ -276,7 +222,7 @@ public Object persist(final Committer committer) throws InterruptedException * * @return null if the input segmentsAndMetadata is null. Otherwise, a {@link ListenableFuture} for the submitted task * which returns {@link SegmentsAndMetadata} containing the segments successfully handed off and the metadata - * of the caller of {@link AppenderatorDriverMetadata} + * of the caller of {@link SegmentTrackerMetadata} */ public ListenableFuture registerHandoff(SegmentsAndMetadata segmentsAndMetadata) { @@ -292,7 +238,7 @@ public ListenableFuture registerHandoff(SegmentsAndMetadata return Futures.immediateFuture( new SegmentsAndMetadata( segmentsAndMetadata.getSegments(), - ((AppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()) + ((SegmentTrackerMetadata) segmentsAndMetadata.getCommitMetadata()) .getCallerMetadata() ) ); @@ -328,7 +274,7 @@ public void onSuccess(Object result) resultFuture.set( new SegmentsAndMetadata( segmentsAndMetadata.getSegments(), - ((AppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()) + ((SegmentTrackerMetadata) segmentsAndMetadata.getCommitMetadata()) .getCallerMetadata() ) ); @@ -362,91 +308,12 @@ public void close() handoffNotifier.close(); } - private SegmentIdentifier getActiveSegment(final DateTime timestamp, final String sequenceName) - { - synchronized (activeSegments) { - final NavigableMap activeSegmentsForSequence = activeSegments.get(sequenceName); - - if (activeSegmentsForSequence == null) { - return null; - } - - final Map.Entry candidateEntry = activeSegmentsForSequence.floorEntry(timestamp.getMillis()); - if (candidateEntry != null && candidateEntry.getValue().getInterval().contains(timestamp)) { - return candidateEntry.getValue(); - } else { - return null; - } - } - } - - /** - * Return a segment usable for "timestamp". May return null if no segment can be allocated. - * - * @param row input row - * @param sequenceName sequenceName for potential segment allocation - * - * @return identifier, or null - * - * @throws IOException if an exception occurs while allocating a segment - */ - private SegmentIdentifier getSegment(final InputRow row, final String sequenceName) throws IOException - { - synchronized (activeSegments) { - final DateTime timestamp = row.getTimestamp(); - final SegmentIdentifier existing = getActiveSegment(timestamp, sequenceName); - if (existing != null) { - return existing; - } else { - // Allocate new segment. - final SegmentIdentifier newSegment = segmentAllocator.allocate( - row, - sequenceName, - lastSegmentIds.get(sequenceName) - ); - - if (newSegment != null) { - for (SegmentIdentifier identifier : appenderator.getSegments()) { - if (identifier.equals(newSegment)) { - throw new ISE( - "WTF?! Allocated segment[%s] which conflicts with existing segment[%s].", - newSegment, - identifier - ); - } - } - - log.info("New segment[%s] for sequenceName[%s].", newSegment, sequenceName); - addSegment(sequenceName, newSegment); - } else { - // Well, we tried. - log.warn("Cannot allocate segment for timestamp[%s], sequenceName[%s]. ", timestamp, sequenceName); - } - - return newSegment; - } - } - } - /** * Move a set of identifiers out from "active", making way for newer segments. */ public void moveSegmentOut(final String sequenceName, final List identifiers) { - synchronized (activeSegments) { - final NavigableMap activeSegmentsForSequence = activeSegments.get(sequenceName); - if (activeSegmentsForSequence == null) { - throw new ISE("WTF?! Asked to remove segments for sequenceName[%s] which doesn't exist...", sequenceName); - } - - for (final SegmentIdentifier identifier : identifiers) { - log.info("Moving segment[%s] out of active list.", identifier); - final long key = identifier.getInterval().getStartMillis(); - if (!activeSegmentsForSequence.remove(key).equals(identifier)) { - throw new ISE("WTF?! Asked to remove segment[%s] that didn't exist...", identifier); - } - } - } + segmentTracker.moveSegmentOut(sequenceName, identifiers); } /** @@ -463,16 +330,7 @@ public ListenableFuture publishAll( final Committer committer ) { - final List theSegments; - synchronized (activeSegments) { - final List sequenceNames = ImmutableList.copyOf(publishPendingSegments.keySet()); - theSegments = sequenceNames.stream() - .map(publishPendingSegments::remove) - .filter(Objects::nonNull) - .flatMap(Collection::stream) - .collect(Collectors.toList()); - sequenceNames.forEach(activeSegments::remove); - } + final List theSegments = segmentTracker.removePublished(); return publish(publisher, wrapCommitter(committer), theSegments); } @@ -493,15 +351,7 @@ public ListenableFuture publish( final Collection sequenceNames ) { - final List theSegments; - synchronized (activeSegments) { - theSegments = sequenceNames.stream() - .map(publishPendingSegments::remove) - .filter(Objects::nonNull) - .flatMap(Collection::stream) - .collect(Collectors.toList()); - sequenceNames.forEach(activeSegments::remove); - } + final List theSegments = segmentTracker.removePublished(sequenceNames); return publish(publisher, wrapCommitter(committer), theSegments); } @@ -553,7 +403,7 @@ private ListenableFuture publish( try { final boolean published = publisher.publishSegments( ImmutableSet.copyOf(segmentsAndMetadata.getSegments()), - ((AppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() + ((SegmentTrackerMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata() ); if (published) { @@ -603,27 +453,7 @@ private Supplier wrapCommitterSupplier(final Supplier comm private WrappedCommitter wrapCommitter(final Committer committer) { - final AppenderatorDriverMetadata wrappedMetadata; - synchronized (activeSegments) { - wrappedMetadata = new AppenderatorDriverMetadata( - ImmutableMap.copyOf( - Maps.transformValues( - activeSegments, - new Function, List>() - { - @Override - public List apply(NavigableMap input) - { - return ImmutableList.copyOf(input.values()); - } - } - ) - ), - ImmutableMap.copyOf(publishPendingSegments), - ImmutableMap.copyOf(lastSegmentIds), - committer.getMetadata() - ); - } + final SegmentTrackerMetadata wrappedMetadata = segmentTracker.wrapMetadata(committer.getMetadata()); return new WrappedCommitter() { diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index a6d9be501217..10a833229503 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -774,7 +774,7 @@ private Object bootstrapSinksFromDisk() try { final SegmentIdentifier identifier = objectMapper.readValue( - new File(sinkDir, "identifier.json"), + new File(sinkDir, IDENTIFIER_FILE_NAME), SegmentIdentifier.class ); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java index c3e8678ba304..81fa87d0b23d 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumber.java @@ -145,7 +145,7 @@ public Object startJob() } @Override - public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException + public int add(InputRow row, String sequenceName, Supplier committerSupplier) throws IndexSizeExceededException { final SegmentIdentifier identifier = getSegmentIdentifier(row.getTimestampFromEpoch()); if (identifier == null) { diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberSchool.java index 66ddd9d4747c..9fdd8de777db 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberSchool.java @@ -55,6 +55,7 @@ public AppenderatorPlumberSchool( @Override public Plumber findPlumber( + final SegmentAllocator segmentAllocator, final DataSchema schema, final RealtimeTuningConfig config, final FireDepartmentMetrics metrics diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SpecBasedSegmentAllocator.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SpecBasedSegmentAllocator.java new file mode 100644 index 000000000000..69f9eeac99a8 --- /dev/null +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SpecBasedSegmentAllocator.java @@ -0,0 +1,67 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import io.druid.data.input.InputRow; +import io.druid.java.util.common.granularity.Granularity; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.realtime.plumber.VersioningPolicy; +import io.druid.timeline.partition.ShardSpec; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.io.IOException; + +/** + * Creates segments based on the realtime configuration spec + */ +public class SpecBasedSegmentAllocator implements SegmentAllocator +{ + private final String dataSource; + private final VersioningPolicy versioningPolicy; + private final Granularity segmentGranularity; + private final ShardSpec shardSpec; + + public SpecBasedSegmentAllocator(DataSchema schema, RealtimeTuningConfig tuningConfig) + { + this.dataSource = schema.getDataSource(); + this.versioningPolicy = tuningConfig.getVersioningPolicy(); + this.segmentGranularity = schema.getGranularitySpec().getSegmentGranularity(); + this.shardSpec = tuningConfig.getShardSpec(); + } + + @Override + public SegmentIdentifier allocate( + InputRow row, String sequenceName, String previousSegmentId + ) throws IOException + { + final DateTime intervalStart = segmentGranularity.bucketStart(row.getTimestamp()); + + final Interval interval = new Interval( + intervalStart, + segmentGranularity.increment(intervalStart) + ); + + String version = versioningPolicy.getVersion(interval); + + return new SegmentIdentifier(dataSource, interval, version, shardSpec); + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java index 3938d017a923..6bdd21cd5b97 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumber.java @@ -37,6 +37,8 @@ import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.DateTime; import org.joda.time.Duration; @@ -73,8 +75,8 @@ public FlushingPlumber( IndexIO indexIO, Cache cache, CacheConfig cacheConfig, - ObjectMapper objectMapper - + ObjectMapper objectMapper, + SegmentAllocator segmentAllocator ) { super( @@ -92,7 +94,8 @@ public FlushingPlumber( indexIO, cache, cacheConfig, - objectMapper + objectMapper, + segmentAllocator ); this.flushDuration = flushDuration; @@ -117,7 +120,7 @@ public Object startJob() return retVal; } - protected void flushAfterDuration(final long truncatedTime, final Sink sink) + protected void flushAfterDuration(final SegmentIdentifier identifier, final Sink sink) { log.info( "Abandoning segment %s at %s", @@ -134,7 +137,7 @@ protected void flushAfterDuration(final long truncatedTime, final Sink sink) public ScheduledExecutors.Signal call() throws Exception { log.info("Abandoning segment %s", sink.getSegment().getIdentifier()); - abandonSegment(truncatedTime, sink); + abandonSegment(identifier, sink); return ScheduledExecutors.Signal.STOP; } } @@ -185,16 +188,16 @@ public ScheduledExecutors.Signal doCall() getRejectionPolicy().getCurrMaxTime().minus(windowMillis) ).getMillis(); - List> sinksToPush = Lists.newArrayList(); - for (Map.Entry entry : getSinks().entrySet()) { - final Long intervalStart = entry.getKey(); - if (intervalStart < minTimestamp) { + List> sinksToPush = Lists.newArrayList(); + for (Map.Entry entry : getSinks().entrySet()) { + DateTime intervalStart = entry.getKey().getInterval().getStart(); + if (intervalStart.isBefore(minTimestamp)) { log.info("Adding entry[%s] to flush.", entry); sinksToPush.add(entry); } } - for (final Map.Entry entry : sinksToPush) { + for (final Map.Entry entry : sinksToPush) { flushAfterDuration(entry.getKey(), entry.getValue()); } @@ -214,7 +217,7 @@ public void finishJob() { log.info("Stopping job"); - for (final Map.Entry entry : getSinks().entrySet()) { + for (final Map.Entry entry : getSinks().entrySet()) { abandonSegment(entry.getKey(), entry.getValue()); } shutdownExecutors(); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java index 11416650ff9a..a9b5a699c33b 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/FlushingPlumberSchool.java @@ -34,6 +34,7 @@ import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.server.coordination.DataSegmentAnnouncer; import org.joda.time.Duration; @@ -102,6 +103,7 @@ public FlushingPlumberSchool( @Override public Plumber findPlumber( + final SegmentAllocator segmentAllocator, final DataSchema schema, final RealtimeTuningConfig config, final FireDepartmentMetrics metrics @@ -122,7 +124,8 @@ public Plumber findPlumber( indexIO, cache, cacheConfig, - objectMapper + objectMapper, + segmentAllocator ); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java index d9ff2342d3b8..916b5503c4fe 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Plumber.java @@ -38,12 +38,13 @@ public interface Plumber /** * @param row the row to insert + * @param sequenceName sequenceName for this row's segment * @param committerSupplier supplier of a committer associated with all data that has been added, including this row * * @return - positive numbers indicate how many summarized rows exist in the index for that timestamp, * -1 means a row was thrown away because it was too late */ - int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException; + int add(InputRow row, String sequenceName, Supplier committerSupplier) throws IndexSizeExceededException; QueryRunner getQueryRunner(Query query); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/PlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/PlumberSchool.java index 8c49822b60d5..02e25aae3a0c 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/PlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/PlumberSchool.java @@ -24,6 +24,7 @@ import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.appenderator.SegmentAllocator; /** */ @@ -39,6 +40,11 @@ public interface PlumberSchool * * @return returns a plumber */ - public Plumber findPlumber(DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics); + Plumber findPlumber( + SegmentAllocator segmentAllocator, + DataSchema schema, + RealtimeTuningConfig config, + FireDepartmentMetrics metrics + ); } diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java b/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java index d1053ea2a1da..23499eeb4828 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Plumbers.java @@ -41,6 +41,7 @@ private Plumbers() public static void addNextRow( final Supplier committerSupplier, + final String sequenceName, final Firehose firehose, final Plumber plumber, final boolean reportParseExceptions, @@ -73,7 +74,7 @@ public static void addNextRow( final int numRows; try { - numRows = plumber.add(inputRow, committerSupplier); + numRows = plumber.add(inputRow, sequenceName, committerSupplier); } catch (IndexSizeExceededException e) { // Shouldn't happen if this is only being called by a single thread. diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 40fe89057edd..a08348817e31 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -46,6 +46,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.Intervals; import io.druid.java.util.common.Pair; +import io.druid.java.util.common.RE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.concurrent.ScheduledExecutors; import io.druid.java.util.common.granularity.Granularity; @@ -69,6 +70,10 @@ import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireHydrant; import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.SegmentTracker; +import io.druid.segment.realtime.SegmentTrackerMetadata; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.appenderator.SinkQuerySegmentWalker; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; @@ -98,6 +103,8 @@ public class RealtimePlumber implements Plumber { private static final EmittingLogger log = new EmittingLogger(RealtimePlumber.class); private static final int WARN_DELAY = 1000; + private static final String IDENTIFIER_FILE_NAME = "identifier.json"; + private static final String SEGMENT_TRACKING_FILE_NAME = "segment-tracking.json"; private final DataSchema schema; private final RealtimeTuningConfig config; @@ -108,11 +115,13 @@ public class RealtimePlumber implements Plumber private final SegmentPublisher segmentPublisher; private final SegmentHandoffNotifier handoffNotifier; private final Object handoffCondition = new Object(); - private final Map sinks = Maps.newConcurrentMap(); + private final Map sinks = Maps.newConcurrentMap(); private final VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline( String.CASE_INSENSITIVE_ORDER ); private final QuerySegmentWalker texasRanger; + private final SegmentTracker segmentTracker; + private final ObjectMapper objectMapper; private final Cache cache; @@ -144,7 +153,8 @@ public RealtimePlumber( IndexIO indexIO, Cache cache, CacheConfig cacheConfig, - ObjectMapper objectMapper + ObjectMapper objectMapper, + SegmentAllocator segmentAllocator ) { this.schema = schema; @@ -168,6 +178,8 @@ public RealtimePlumber( cache, cacheConfig ); + this.segmentTracker = new SegmentTracker(segmentAllocator); + this.objectMapper = objectMapper; log.info("Creating plumber using rejectionPolicy[%s]", getRejectionPolicy()); } @@ -187,7 +199,7 @@ public RejectionPolicy getRejectionPolicy() return rejectionPolicy; } - public Map getSinks() + protected Map getSinks() { return sinks; } @@ -207,10 +219,23 @@ public Object startJob() } @Override - public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException + public int add(InputRow row, String sequenceName, Supplier committerSupplier) throws IndexSizeExceededException { long messageTimestamp = row.getTimestampFromEpoch(); - final Sink sink = getSink(messageTimestamp); + if (!rejectionPolicy.accept(messageTimestamp)) { + return -1; + } + + final SegmentIdentifier identifier; + + try { + identifier = segmentTracker.getSegment(row, sequenceName); + } + catch (IOException ex) { + throw new RE(ex, "Could not allocate segment for {}", row.getTimestamp()); + } + + final Sink sink = getSink(identifier); metrics.reportMessageMaxTimestamp(messageTimestamp); if (sink == null) { return -1; @@ -225,35 +250,21 @@ public int add(InputRow row, Supplier committerSupplier) throws Index return numRows; } - private Sink getSink(long timestamp) + private Sink getSink(SegmentIdentifier identifier) { - if (!rejectionPolicy.accept(timestamp)) { - return null; - } - - final Granularity segmentGranularity = schema.getGranularitySpec().getSegmentGranularity(); - final VersioningPolicy versioningPolicy = config.getVersioningPolicy(); - - DateTime truncatedDateTime = segmentGranularity.bucketStart(DateTimes.utc(timestamp)); - final long truncatedTime = truncatedDateTime.getMillis(); - - Sink retVal = sinks.get(truncatedTime); + Sink retVal = sinks.get(identifier); if (retVal == null) { - final Interval sinkInterval = new Interval( - truncatedDateTime, - segmentGranularity.increment(truncatedDateTime) - ); - retVal = new Sink( - sinkInterval, + identifier.getInterval(), schema, - config.getShardSpec(), - versioningPolicy.getVersion(sinkInterval), + identifier.getShardSpec(), + identifier.getVersion(), config.getMaxRowsInMemory(), config.isReportParseExceptions() ); - addSink(retVal); + + addSink(identifier, retVal); } @@ -270,10 +281,11 @@ public QueryRunner getQueryRunner(final Query query) @Override public void persist(final Committer committer) { - final List> indexesToPersist = Lists.newArrayList(); - for (Sink sink : sinks.values()) { + final List> indexesToPersist = Lists.newArrayList(); + for (Map.Entry entry : sinks.entrySet()) { + final Sink sink = entry.getValue(); if (sink.swappable()) { - indexesToPersist.add(Pair.of(sink.swap(), sink.getInterval())); + indexesToPersist.add(Pair.of(sink.swap(), entry.getKey())); } } @@ -323,7 +335,7 @@ handed off instead of individual segments being handed off (that is, if one of t */ long persistThreadCpuTime = VMUtils.safeGetThreadCpuTime(); try { - for (Pair pair : indexesToPersist) { + for (Pair pair : indexesToPersist) { metrics.incrementRowOutputCount( persistHydrant( pair.lhs, schema, pair.rhs, metadataElems @@ -356,10 +368,10 @@ handed off instead of individual segments being handed off (that is, if one of t } // Submits persist-n-merge task for a Sink to the mergeExecutor - private void persistAndMerge(final long truncatedTime, final Sink sink) + private void persistAndMerge(final SegmentIdentifier identifier, final Sink sink) { final String threadName = StringUtils.format( - "%s-%s-persist-n-merge", schema.getDataSource(), DateTimes.utc(truncatedTime) + "%s-%s-persist-n-merge", schema.getDataSource(), identifier ); mergeExecutor.execute( new ThreadRenamingRunnable(threadName) @@ -372,7 +384,7 @@ public void doRun() { try { // Bail out if this sink has been abandoned by a previously-executed task. - if (sinks.get(truncatedTime) != sink) { + if (sinks.get(identifier) != sink) { log.info("Sink[%s] was abandoned, bailing out of persist-n-merge.", sink); return; } @@ -402,7 +414,7 @@ public void doRun() synchronized (hydrant) { if (!hydrant.hasSwapped()) { log.info("Hydrant[%s] hasn't swapped yet, swapping. Sink[%s]", hydrant, sink); - final int rowCount = persistHydrant(hydrant, schema, interval, null); + final int rowCount = persistHydrant(hydrant, schema, identifier, null); metrics.incrementRowOutputCount(rowCount); } } @@ -468,7 +480,7 @@ public void doRun() // We're trying to shut down, and this segment failed to push. Let's just get rid of it. // This call will also delete possibly-partially-written files, so we don't need to do it explicitly. cleanShutdown = false; - abandonSegment(truncatedTime, sink); + abandonSegment(identifier, sink); } } finally { @@ -480,13 +492,13 @@ public void doRun() } ); handoffNotifier.registerSegmentHandoffCallback( - new SegmentDescriptor(sink.getInterval(), sink.getVersion(), config.getShardSpec().getPartitionNum()), + new SegmentDescriptor(sink.getInterval(), sink.getVersion(), identifier.getShardSpec().getPartitionNum()), mergeExecutor, new Runnable() { @Override public void run() { - abandonSegment(sink.getInterval().getStartMillis(), sink); + abandonSegment(identifier, sink); metrics.incrementHandOffCount(); } } @@ -500,7 +512,7 @@ public void finishJob() shuttingDown = true; - for (final Map.Entry entry : sinks.entrySet()) { + for (final Map.Entry entry : sinks.entrySet()) { persistAndMerge(entry.getKey(), entry.getValue()); } @@ -600,8 +612,6 @@ protected void shutdownExecutors() protected Object bootstrapSinksFromDisk() { - final VersioningPolicy versioningPolicy = config.getVersioningPolicy(); - File baseDir = computeBaseDir(schema); if (baseDir == null || !baseDir.exists()) { return null; @@ -615,6 +625,50 @@ protected Object bootstrapSinksFromDisk() Object metadata = null; long latestCommitTime = 0; for (File sinkDir : files) { + final File identifierFile = new File(sinkDir, IDENTIFIER_FILE_NAME); + if (!identifierFile.isFile()) { + // No identifier in this sinkDir; it must not actually be a sink directory. Skip it. + continue; + } + + final SegmentIdentifier identifier; + + try { + identifier = objectMapper.readValue( + identifierFile, + SegmentIdentifier.class + ); + } + catch (IOException e) { + log.makeAlert(e, "Problem loading sink[%s] from disk.", schema.getDataSource()) + .addData("sinkDir", sinkDir) + .emit(); + continue; + } + + final File segmentTrackingFile = new File(sinkDir, SEGMENT_TRACKING_FILE_NAME); + if (!segmentTrackingFile.isFile()) { + // No segmenttracking in this sinkDir + continue; + } + + final SegmentTrackerMetadata segmentTrackerMetadata; + + try { + segmentTrackerMetadata = objectMapper.readValue( + segmentTrackingFile, + SegmentTrackerMetadata.class + ); + + segmentTracker.restoreFromMetadata(segmentTrackerMetadata); + } + catch (IOException e) { + log.makeAlert(e, "Problem loading sink[%s] from disk.", schema.getDataSource()) + .addData("sinkDir", sinkDir) + .emit(); + continue; + } + final Interval sinkInterval = Intervals.of(sinkDir.getName().replace("_", "/")); //final File[] sinkFiles = sinkDir.listFiles(); @@ -697,13 +751,7 @@ public int compare(File o1, File o2) hydrants.add( new FireHydrant( new QueryableIndexSegment( - DataSegment.makeDataSegmentIdentifier( - schema.getDataSource(), - sinkInterval.getStart(), - sinkInterval.getEnd(), - versioningPolicy.getVersion(sinkInterval), - config.getShardSpec() - ), + identifier.getIdentifierAsString(), queryableIndex ), Integer.parseInt(segmentDir.getName()) @@ -721,20 +769,20 @@ public int compare(File o1, File o2) final Sink currSink = new Sink( sinkInterval, schema, - config.getShardSpec(), - versioningPolicy.getVersion(sinkInterval), + identifier.getShardSpec(), + identifier.getVersion(), config.getMaxRowsInMemory(), config.isReportParseExceptions(), hydrants ); - addSink(currSink); + addSink(identifier, currSink); } return metadata; } - private void addSink(final Sink sink) + private void addSink(SegmentIdentifier identifier, final Sink sink) { - sinks.put(sink.getInterval().getStartMillis(), sink); + sinks.put(identifier, sink); metrics.setSinkCount(sinks.size()); sinkTimeline.add( sink.getInterval(), @@ -824,17 +872,17 @@ private void mergeAndPush() minTimestampAsDate ); - List> sinksToPush = Lists.newArrayList(); - for (Map.Entry entry : sinks.entrySet()) { - final Long intervalStart = entry.getKey(); - if (intervalStart < minTimestamp) { + List> sinksToPush = Lists.newArrayList(); + for (Map.Entry entry : sinks.entrySet()) { + final DateTime intervalStart = entry.getKey().getInterval().getStart(); + if (intervalStart.isBefore(minTimestamp)) { log.info("Adding entry [%s] for merge and push.", entry); sinksToPush.add(entry); } else { log.info( "Skipping persist and merge for entry [%s] : Start time [%s] >= [%s] min timestamp required in this run. Segment will be picked up in a future run.", entry, - DateTimes.utc(intervalStart), + intervalStart, minTimestampAsDate ); } @@ -842,7 +890,7 @@ private void mergeAndPush() log.info("Found [%,d] sinks to persist and merge", sinksToPush.size()); - for (final Map.Entry entry : sinksToPush) { + for (final Map.Entry entry : sinksToPush) { persistAndMerge(entry.getKey(), entry.getValue()); } } @@ -852,17 +900,17 @@ private void mergeAndPush() * from the single-threaded mergeExecutor, since otherwise chaos may ensue if merged segments are deleted while * being created. * - * @param truncatedTime sink key + * @param identifier sink key * @param sink sink to unannounce */ - protected void abandonSegment(final long truncatedTime, final Sink sink) + protected void abandonSegment(final SegmentIdentifier identifier, final Sink sink) { - if (sinks.containsKey(truncatedTime)) { + if (sinks.containsKey(identifier)) { try { segmentAnnouncer.unannounceSegment(sink.getSegment()); removeSegment(sink, computePersistDir(schema, sink.getInterval())); - log.info("Removing sinkKey %d for segment %s", truncatedTime, sink.getSegment().getIdentifier()); - sinks.remove(truncatedTime); + log.info("Removing sinkKey %s for segment %s", identifier, sink.getSegment().getIdentifier()); + sinks.remove(identifier); metrics.setSinkCount(sinks.size()); sinkTimeline.remove( sink.getInterval(), @@ -908,14 +956,14 @@ protected File computePersistDir(DataSchema schema, Interval interval) * * @param indexToPersist hydrant to persist * @param schema datasource schema - * @param interval interval to persist + * @param identifier segment identifier to persist * * @return the number of rows persisted */ protected int persistHydrant( FireHydrant indexToPersist, DataSchema schema, - Interval interval, + SegmentIdentifier identifier, Map metadataElems ) { @@ -923,7 +971,7 @@ protected int persistHydrant( if (indexToPersist.hasSwapped()) { log.info( "DataSource[%s], Interval[%s], Hydrant[%s] already swapped. Ignoring request to persist.", - schema.getDataSource(), interval, indexToPersist + schema.getDataSource(), identifier.getInterval(), indexToPersist ); return 0; } @@ -931,7 +979,7 @@ protected int persistHydrant( log.info( "DataSource[%s], Interval[%s], Metadata [%s] persisting Hydrant[%s]", schema.getDataSource(), - interval, + identifier.getInterval(), metadataElems, indexToPersist ); @@ -941,13 +989,22 @@ protected int persistHydrant( final IndexSpec indexSpec = config.getIndexSpec(); indexToPersist.getIndex().getMetadata().putAll(metadataElems); + + final File persistDir = computePersistDir(schema, identifier.getInterval()); final File persistedFile = indexMerger.persist( indexToPersist.getIndex(), - interval, - new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())), + identifier.getInterval(), + new File(persistDir, String.valueOf(indexToPersist.getCount())), indexSpec ); + final File identifierFile = new File(persistDir, IDENTIFIER_FILE_NAME); + objectMapper.writeValue(identifierFile, identifier); + + final File segmentTrackingMetadataFile = new File(persistDir, SEGMENT_TRACKING_FILE_NAME); + final SegmentTrackerMetadata segmentTrackerMetadata = segmentTracker.wrapMetadata(null); + objectMapper.writeValue(segmentTrackingMetadataFile, segmentTrackerMetadata); + indexToPersist.swapSegment( new QueryableIndexSegment( indexToPersist.getSegmentIdentifier(), @@ -958,7 +1015,7 @@ protected int persistHydrant( } catch (IOException e) { log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource()) - .addData("interval", interval) + .addData("interval", identifier.getInterval()) .addData("count", indexToPersist.getCount()) .emit(); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java index 8aaf0f2bf7a2..c16f2a86a247 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumberSchool.java @@ -35,6 +35,7 @@ import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.server.coordination.DataSegmentAnnouncer; import java.util.concurrent.ExecutorService; @@ -89,6 +90,7 @@ public RealtimePlumberSchool( @Override public Plumber findPlumber( + final SegmentAllocator segmentAllocator, final DataSchema schema, final RealtimeTuningConfig config, final FireDepartmentMetrics metrics @@ -111,7 +113,8 @@ public Plumber findPlumber( indexIO, cache, cacheConfig, - objectMapper + objectMapper, + segmentAllocator ); } diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 90b1e66f4b24..aa4d2abd2113 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -70,6 +70,7 @@ import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.segment.realtime.plumber.Sink; @@ -167,7 +168,7 @@ public Firehose connect(InputRowParser parser, File temporaryDirectory) throws I { @Override public Plumber findPlumber( - DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + SegmentAllocator segmentAllocator, DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics ) { return plumber; @@ -181,7 +182,7 @@ public Plumber findPlumber( { @Override public Plumber findPlumber( - DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + SegmentAllocator segmentAllocator, DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics ) { return plumber2; @@ -373,12 +374,12 @@ public Firehose connect(InputRowParser parser, File temporaryDirectory) throws I return firehose; } }, - (schema, config, metrics) -> plumber, + (segmentAllocator, schema, config, metrics) -> plumber, null ); RealtimeIOConfig ioConfig2 = new RealtimeIOConfig( null, - (schema, config, metrics) -> plumber2, + (segmentAllocator, schema, config, metrics) -> plumber2, (parser, arg) -> firehoseV2 ); @@ -417,7 +418,7 @@ public Firehose connect(InputRowParser parser, File temporaryDirectory) throws I return firehose; } }, - (schema, config, metrics) -> plumber, + (segmentAllocator, schema, config, metrics) -> plumber, null ); @@ -1043,7 +1044,7 @@ public Object startJob() } @Override - public int add(InputRow row, Supplier committerSupplier) throws IndexSizeExceededException + public int add(InputRow row, String sequenceName, Supplier committerSupplier) throws IndexSizeExceededException { if (row == null) { return -1; diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverFailTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverFailTest.java index e1237ef16966..73a8fa35b6b2 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverFailTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverFailTest.java @@ -41,7 +41,6 @@ import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.appenderator.AppenderatorDriverTest.TestCommitterSupplier; -import io.druid.segment.realtime.appenderator.AppenderatorDriverTest.TestSegmentAllocator; import io.druid.segment.realtime.appenderator.AppenderatorDriverTest.TestSegmentHandoffNotifierFactory; import io.druid.timeline.DataSegment; import org.hamcrest.CoreMatchers; @@ -99,7 +98,7 @@ public class AppenderatorDriverFailTest @Before public void setUp() { - allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR); + allocator = new TestSegmentAllocator(DATA_SOURCE, "abc123", Granularities.HOUR); segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory(); } diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java index d6430b6acff6..11b29c81f3ed 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorDriverTest.java @@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; @@ -38,7 +37,6 @@ import io.druid.java.util.common.Intervals; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.granularity.Granularities; -import io.druid.java.util.common.granularity.Granularity; import io.druid.query.SegmentDescriptor; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.plumber.SegmentHandoffNotifier; @@ -48,7 +46,6 @@ import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.NumberedShardSpec; import io.druid.timeline.partition.PartitionChunk; -import org.joda.time.DateTime; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -57,13 +54,11 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; public class AppenderatorDriverTest @@ -103,7 +98,7 @@ public class AppenderatorDriverTest public void setUp() { appenderatorTester = new AppenderatorTester(MAX_ROWS_IN_MEMORY); - allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR); + allocator = new TestSegmentAllocator(DATA_SOURCE, VERSION, Granularities.HOUR); segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory(); driver = new AppenderatorDriver( appenderatorTester.getAppenderator(), @@ -139,8 +134,8 @@ public void testSimple() throws Exception committerSupplier.get(), ImmutableList.of("dummy") ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); - Assert.assertFalse(driver.getActiveSegments().containsKey("dummy")); - Assert.assertFalse(driver.getPublishPendingSegments().containsKey("dummy")); + Assert.assertFalse(driver.getSegmentTracker().getActiveSegments().containsKey("dummy")); + Assert.assertFalse(driver.getSegmentTracker().getPublishPendingSegments().containsKey("dummy")); final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published) .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); @@ -186,8 +181,8 @@ public void testMaxRowsPerSegment() throws Exception committerSupplier.get(), ImmutableList.of("dummy") ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); - Assert.assertFalse(driver.getActiveSegments().containsKey("dummy")); - Assert.assertFalse(driver.getPublishPendingSegments().containsKey("dummy")); + Assert.assertFalse(driver.getSegmentTracker().getActiveSegments().containsKey("dummy")); + Assert.assertFalse(driver.getSegmentTracker().getPublishPendingSegments().containsKey("dummy")); final SegmentsAndMetadata segmentsAndMetadata = driver.registerHandoff(published) .get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); Assert.assertEquals(numSegments, segmentsAndMetadata.getSegments().size()); @@ -212,8 +207,8 @@ public void testHandoffTimeout() throws Exception committerSupplier.get(), ImmutableList.of("dummy") ).get(PUBLISH_TIMEOUT, TimeUnit.MILLISECONDS); - Assert.assertFalse(driver.getActiveSegments().containsKey("dummy")); - Assert.assertFalse(driver.getPublishPendingSegments().containsKey("dummy")); + Assert.assertFalse(driver.getSegmentTracker().getActiveSegments().containsKey("dummy")); + Assert.assertFalse(driver.getSegmentTracker().getPublishPendingSegments().containsKey("dummy")); driver.registerHandoff(published).get(HANDOFF_CONDITION_TIMEOUT, TimeUnit.MILLISECONDS); } @@ -398,42 +393,6 @@ public void run() } } - static class TestSegmentAllocator implements SegmentAllocator - { - private final String dataSource; - private final Granularity granularity; - private final Map counters = Maps.newHashMap(); - - public TestSegmentAllocator(String dataSource, Granularity granularity) - { - this.dataSource = dataSource; - this.granularity = granularity; - } - - @Override - public SegmentIdentifier allocate( - final InputRow row, - final String sequenceName, - final String previousSegmentId - ) throws IOException - { - synchronized (counters) { - DateTime dateTimeTruncated = granularity.bucketStart(row.getTimestamp()); - final long timestampTruncated = dateTimeTruncated.getMillis(); - if (!counters.containsKey(timestampTruncated)) { - counters.put(timestampTruncated, new AtomicInteger()); - } - final int partitionNum = counters.get(timestampTruncated).getAndIncrement(); - return new SegmentIdentifier( - dataSource, - granularity.bucket(dateTimeTruncated), - VERSION, - new NumberedShardSpec(partitionNum, 0) - ); - } - } - } - static class TestSegmentHandoffNotifierFactory implements SegmentHandoffNotifierFactory { private boolean handoffEnabled = true; diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index 3a6dc6dcfb0a..ea4360442dc7 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -112,17 +112,17 @@ public void testSimpleIngestion() throws Exception commitMetadata.put("x", "1"); Assert.assertEquals( 1, - plumber.add(rows[0], null)); + plumber.add(rows[0], null, null)); commitMetadata.put("x", "2"); Assert.assertEquals( 2, - plumber.add(rows[1], null)); + plumber.add(rows[1], null, null)); commitMetadata.put("x", "3"); Assert.assertEquals( 3, - plumber.add(rows[2], null)); + plumber.add(rows[2], null, null)); Assert.assertEquals(1, plumber.getSegmentsView().size()); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/SpecBasedSegmentAllocatorTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/SpecBasedSegmentAllocatorTest.java new file mode 100644 index 000000000000..3f7d797d2fbf --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/SpecBasedSegmentAllocatorTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.data.input.InputRow; +import io.druid.data.input.MapBasedInputRow; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.java.util.common.DateTimes; +import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.granularity.Granularities; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import io.druid.segment.realtime.plumber.CustomVersioningPolicy; +import io.druid.timeline.partition.LinearShardSpec; +import io.druid.timeline.partition.ShardSpec; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; + +public class SpecBasedSegmentAllocatorTest +{ + + @Test + public void basicTest() throws Exception + { + DataSchema schema = new DataSchema( + "dataSource", + Collections.emptyMap(), + new AggregatorFactory[] {}, + new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null), + new DefaultObjectMapper() + ); + + ShardSpec shardSpec = new LinearShardSpec(1); + RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( + 0, + null, + null, + null, + new CustomVersioningPolicy("testing"), + null, + null, + shardSpec, + null, + null, + 0, + 0, + null, + null, + null + ); + + SegmentAllocator allocator = new SpecBasedSegmentAllocator(schema, tuningConfig); + + InputRow row = new MapBasedInputRow( + DateTimes.of("2017-09-10T12:35:00.000Z"), + ImmutableList.of("dim1"), + ImmutableMap.of("dim1", "foo", "met1", "1") + ); + + SegmentIdentifier identifier = allocator.allocate(row, null, null); + + Assert.assertEquals("dataSource", identifier.getDataSource()); + Assert.assertEquals("testing", identifier.getVersion()); + Assert.assertEquals(shardSpec, identifier.getShardSpec()); + Assert.assertEquals( + Intervals.of("2017-09-10T00:00:00.000Z/2017-09-11T00:00:00.000Z"), + identifier.getInterval() + ); + } + +} diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/TestSegmentAllocator.java b/server/src/test/java/io/druid/segment/realtime/appenderator/TestSegmentAllocator.java new file mode 100644 index 000000000000..517f7fcfbf02 --- /dev/null +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/TestSegmentAllocator.java @@ -0,0 +1,68 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.segment.realtime.appenderator; + +import com.google.common.collect.Maps; +import io.druid.data.input.InputRow; +import io.druid.java.util.common.granularity.Granularity; +import io.druid.timeline.partition.NumberedShardSpec; +import org.joda.time.DateTime; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +public class TestSegmentAllocator implements SegmentAllocator +{ + private final String dataSource; + private final String version; + private final Granularity granularity; + private final Map counters = Maps.newHashMap(); + + public TestSegmentAllocator(String dataSource, String version, Granularity granularity) + { + this.dataSource = dataSource; + this.version = version; + this.granularity = granularity; + } + + @Override + public SegmentIdentifier allocate( + final InputRow row, + final String sequenceName, + final String previousSegmentId + ) throws IOException + { + synchronized (counters) { + DateTime dateTimeTruncated = granularity.bucketStart(row.getTimestamp()); + final long timestampTruncated = dateTimeTruncated.getMillis(); + if (!counters.containsKey(timestampTruncated)) { + counters.put(timestampTruncated, new AtomicInteger()); + } + final int partitionNum = counters.get(timestampTruncated).getAndIncrement(); + return new SegmentIdentifier( + dataSource, + granularity.bucket(dateTimeTruncated), + version, + new NumberedShardSpec(partitionNum, 0) + ); + } + } +} diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index a9a9c1ab48ea..bf627f91dd21 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -56,8 +56,11 @@ import io.druid.segment.realtime.FireDepartmentTest; import io.druid.segment.realtime.FireHydrant; import io.druid.segment.realtime.SegmentPublisher; +import io.druid.segment.realtime.appenderator.SegmentAllocator; +import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.NoneShardSpec; import org.apache.commons.io.FileUtils; import org.easymock.EasyMock; import org.joda.time.DateTime; @@ -98,6 +101,9 @@ public class RealtimePlumberSchoolTest private DataSchema schema2; private FireDepartmentMetrics metrics; private File tmpDir; + private SegmentAllocator segmentAllocator; + private SegmentIdentifier segmentIdentifier; + private String sequenceName; public RealtimePlumberSchoolTest(RejectionPolicyFactory rejectionPolicy) { @@ -183,6 +189,8 @@ public void setUp() throws Exception ) ).andReturn(true).anyTimes(); + segmentAllocator = EasyMock.createMock(SegmentAllocator.class); + emitter = EasyMock.createMock(ServiceEmitter.class); EasyMock.replay(announcer, segmentPublisher, dataSegmentPusher, handoffNotifierFactory, handoffNotifier, emitter); @@ -221,7 +229,9 @@ public void setUp() throws Exception ); metrics = new FireDepartmentMetrics(); - plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema, tuningConfig, metrics); + plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(segmentAllocator, schema, tuningConfig, metrics); + + sequenceName = String.valueOf(System.nanoTime()); } @After @@ -249,17 +259,25 @@ public void testPersistWithCommitMetadata() throws Exception final Object commitMetadata = "dummyCommitMetadata"; testPersist(commitMetadata); - plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema, tuningConfig, metrics); + plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(segmentAllocator, schema, tuningConfig, metrics); Assert.assertEquals(commitMetadata, plumber.startJob()); } private void testPersist(final Object commitMetadata) throws Exception { + Interval segmentInterval = Intervals.utc(0, TimeUnit.HOURS.toMillis(1)); + segmentIdentifier = new SegmentIdentifier( + schema.getDataSource(), + segmentInterval, + "version", + tuningConfig.getShardSpec() + ); + plumber.getSinks() .put( - 0L, + segmentIdentifier, new Sink( - Intervals.utc(0, TimeUnit.HOURS.toMillis(1)), + segmentInterval, schema, tuningConfig.getShardSpec(), DateTimes.of("2014-12-01T12:34:56.789").toString(), @@ -274,6 +292,11 @@ private void testPersist(final Object commitMetadata) throws Exception EasyMock.expect(row.getDimensions()).andReturn(new ArrayList()); EasyMock.replay(row); + EasyMock.expect( + segmentAllocator.allocate(EasyMock.anyObject(InputRow.class), EasyMock.anyString(), EasyMock.anyString()) + ).andReturn(segmentIdentifier); + EasyMock.replay(segmentAllocator); + final CountDownLatch doneSignal = new CountDownLatch(1); final Committer committer = new Committer() @@ -290,7 +313,7 @@ public void run() doneSignal.countDown(); } }; - plumber.add(row, Suppliers.ofInstance(committer)); + plumber.add(row, sequenceName, Suppliers.ofInstance(committer)); plumber.persist(committer); doneSignal.await(); @@ -302,11 +325,23 @@ public void run() @Test(timeout = 60000) public void testPersistFails() throws Exception { + Interval segmentInterval = Intervals.utc(0, TimeUnit.HOURS.toMillis(1)); + segmentIdentifier = new SegmentIdentifier( + schema.getDataSource(), + segmentInterval, + "version", + tuningConfig.getShardSpec() + ); + EasyMock.expect( + segmentAllocator.allocate(EasyMock.anyObject(InputRow.class), EasyMock.anyString(), EasyMock.anyString()) + ).andReturn(segmentIdentifier); + EasyMock.replay(segmentAllocator); + plumber.getSinks() .put( - 0L, + segmentIdentifier, new Sink( - Intervals.utc(0, TimeUnit.HOURS.toMillis(1)), + segmentInterval, schema, tuningConfig.getShardSpec(), DateTimes.of("2014-12-01T12:34:56.789").toString(), @@ -319,7 +354,7 @@ public void testPersistFails() throws Exception EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L); EasyMock.expect(row.getDimensions()).andReturn(new ArrayList()); EasyMock.replay(row); - plumber.add(row, Suppliers.ofInstance(Committers.nil())); + plumber.add(row, sequenceName, Suppliers.ofInstance(Committers.nil())); final CountDownLatch doneSignal = new CountDownLatch(1); @@ -358,10 +393,19 @@ private void testPersistHydrantGapsHelper(final Object commitMetadata) throws Ex { Interval testInterval = new Interval(DateTimes.of("1970-01-01"), DateTimes.of("1971-01-01")); - RealtimePlumber plumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics); + segmentIdentifier = new SegmentIdentifier(schema2.getDataSource(), testInterval, "test", NoneShardSpec.instance()); + + EasyMock.expect( + segmentAllocator.allocate(EasyMock.anyObject(InputRow.class), EasyMock.anyString(), EasyMock.anyString()) + ).andReturn( + segmentIdentifier + ); + EasyMock.replay(segmentAllocator); + + RealtimePlumber plumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber(segmentAllocator, schema2, tuningConfig, metrics); plumber2.getSinks() .put( - 0L, + segmentIdentifier, new Sink( testInterval, schema2, @@ -387,11 +431,11 @@ public void run() doneSignal.countDown(); } }; - plumber2.add(getTestInputRow("1970-01-01"), Suppliers.ofInstance(committer)); - plumber2.add(getTestInputRow("1970-02-01"), Suppliers.ofInstance(committer)); - plumber2.add(getTestInputRow("1970-03-01"), Suppliers.ofInstance(committer)); - plumber2.add(getTestInputRow("1970-04-01"), Suppliers.ofInstance(committer)); - plumber2.add(getTestInputRow("1970-05-01"), Suppliers.ofInstance(committer)); + plumber2.add(getTestInputRow("1970-01-01"), sequenceName, Suppliers.ofInstance(committer)); + plumber2.add(getTestInputRow("1970-02-01"), sequenceName, Suppliers.ofInstance(committer)); + plumber2.add(getTestInputRow("1970-03-01"), sequenceName, Suppliers.ofInstance(committer)); + plumber2.add(getTestInputRow("1970-04-01"), sequenceName, Suppliers.ofInstance(committer)); + plumber2.add(getTestInputRow("1970-05-01"), sequenceName, Suppliers.ofInstance(committer)); plumber2.persist(committer); @@ -410,17 +454,18 @@ public void run() FileUtils.deleteDirectory(new File(persistDir, "1")); FileUtils.deleteDirectory(new File(persistDir, "3")); RealtimePlumber restoredPlumber = (RealtimePlumber) realtimePlumberSchool.findPlumber( + segmentAllocator, schema2, tuningConfig, metrics ); restoredPlumber.bootstrapSinksFromDisk(); - Map sinks = restoredPlumber.getSinks(); + Map sinks = restoredPlumber.getSinks(); Assert.assertEquals(1, sinks.size()); - List hydrants = Lists.newArrayList(sinks.get(new Long(0))); + List hydrants = Lists.newArrayList(sinks.get(segmentIdentifier)); DateTime startTime = DateTimes.of("1970-01-01T00:00:00.000Z"); Interval expectedInterval = new Interval(startTime, DateTimes.of("1971-01-01T00:00:00.000Z")); Assert.assertEquals(0, hydrants.get(0).getCount()); @@ -444,6 +489,7 @@ public void run() FileUtils.deleteDirectory(new File(persistDir, "2")); FileUtils.deleteDirectory(new File(persistDir, "4")); RealtimePlumber restoredPlumber2 = (RealtimePlumber) realtimePlumberSchool.findPlumber( + segmentAllocator, schema2, tuningConfig, metrics @@ -473,9 +519,20 @@ private void testDimOrderInheritanceHelper(final Object commitMetadata) throws E QueryableIndex qindex; FireHydrant hydrant; - Map sinks; + Map sinks; + + Interval testInterval = new Interval(DateTimes.of("1970-01-01"), DateTimes.of("1971-01-01")); + + segmentIdentifier = new SegmentIdentifier(schema2.getDataSource(), testInterval, "test", NoneShardSpec.instance()); + + EasyMock.expect( + segmentAllocator.allocate(EasyMock.anyObject(InputRow.class), EasyMock.anyString(), EasyMock.anyString()) + ).andReturn( + segmentIdentifier + ); + EasyMock.replay(segmentAllocator); - RealtimePlumber plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema2, tuningConfig, metrics); + RealtimePlumber plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(segmentAllocator, schema2, tuningConfig, metrics); Assert.assertNull(plumber.startJob()); final CountDownLatch doneSignal = new CountDownLatch(1); @@ -501,6 +558,7 @@ public void run() ImmutableList.of("dimD"), ImmutableList.of("1") ), + sequenceName, Suppliers.ofInstance(committer) ); plumber.add( @@ -509,6 +567,7 @@ public void run() ImmutableList.of("dimC"), ImmutableList.of("1") ), + sequenceName, Suppliers.ofInstance(committer) ); plumber.add( @@ -517,6 +576,7 @@ public void run() ImmutableList.of("dimA"), ImmutableList.of("1") ), + sequenceName, Suppliers.ofInstance(committer) ); plumber.add( @@ -525,6 +585,7 @@ public void run() ImmutableList.of("dimB"), ImmutableList.of("1") ), + sequenceName, Suppliers.ofInstance(committer) ); plumber.add( @@ -533,6 +594,7 @@ public void run() ImmutableList.of("dimE"), ImmutableList.of("1") ), + sequenceName, Suppliers.ofInstance(committer) ); plumber.add( @@ -541,6 +603,7 @@ public void run() ImmutableList.of("dimA", "dimB", "dimC", "dimD", "dimE"), ImmutableList.of("1") ), + sequenceName, Suppliers.ofInstance(committer) ); @@ -552,6 +615,7 @@ public void run() plumber.finishJob(); RealtimePlumber restoredPlumber = (RealtimePlumber) realtimePlumberSchool.findPlumber( + segmentAllocator, schema2, tuningConfig, metrics @@ -560,7 +624,7 @@ public void run() sinks = restoredPlumber.getSinks(); Assert.assertEquals(1, sinks.size()); - List hydrants = Lists.newArrayList(sinks.get(0L)); + List hydrants = Lists.newArrayList(sinks.get(segmentIdentifier)); for (int i = 0; i < hydrants.size(); i++) { hydrant = hydrants.get(i); diff --git a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java index b83a93bdbe9d..0664aec2f3bb 100644 --- a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java @@ -37,6 +37,7 @@ import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartmentMetrics; +import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.firehose.LocalFirehoseFactory; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; @@ -157,7 +158,7 @@ public void testTaskValidator() throws Exception { @Override public Plumber findPlumber( - DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics + SegmentAllocator segmentAllocator, DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics ) { return null;