diff --git a/docs/content/configuration/coordinator.md b/docs/content/configuration/coordinator.md index 1461953bf0e4..1c98496125fe 100644 --- a/docs/content/configuration/coordinator.md +++ b/docs/content/configuration/coordinator.md @@ -25,7 +25,7 @@ The coordinator node uses several of the global configs in [Configuration](../co |`druid.coordinator.period`|The run period for the coordinator. The coordinator’s operates by maintaining the current state of the world in memory and periodically looking at the set of segments available and segments being served to make decisions about whether any changes need to be made to the data topology. This property sets the delay between each of these runs.|PT60S| |`druid.coordinator.period.indexingPeriod`|How often to send indexing tasks to the indexing service. Only applies if merge or conversion is turned on.|PT1800S (30 mins)| |`druid.coordinator.startDelay`|The operation of the Coordinator works on the assumption that it has an up-to-date view of the state of the world when it runs, the current ZK interaction code, however, is written in a way that doesn’t allow the Coordinator to know for a fact that it’s done loading the current state of the world. This delay is a hack to give it enough time to believe that it has all the data.|PT300S| -|`druid.coordinator.merge.on`|Boolean flag for whether or not the coordinator should try and merge small segments into a more optimal segment size.|false| +|`druid.coordinator.merge.strategy`|The merge strategy to be used to merge segments. See [Merge Segments](../ingestion/merge-segments.html).|null| |`druid.coordinator.conversion.on`|Boolean flag for converting old segment indexing versions to the latest segment indexing version.|false| |`druid.coordinator.load.timeout`|The timeout duration for when the coordinator assigns a segment to a historical node.|PT15M| |`druid.coordinator.kill.on`|Boolean flag for whether or not the coordinator should submit kill task for unused segments, that is, hard delete them from metadata store and deep storage. If set to true, then for all the whitelisted dataSources, coordinator will submit tasks periodically based on `period` specified. These kill tasks will delete all segments except for the last `durationToRetain` period. Whitelist can be set via dynamic configuration `killDataSourceWhitelist` described later.|false| @@ -72,7 +72,40 @@ A sample coordinator dynamic config JSON object is shown below: "replicantLifetime": 15, "replicationThrottleLimit": 10, "emitBalancingStats": false, - "killDataSourceWhitelist": ["wikipedia", "testDatasource"] + "killDataSourceWhitelist": ["wikipedia", "testDatasource"], + "hadoopMergeConfig": { + "keepGap": true, + "hadoopDependencyCoordinates": null, + "tuningConfig": null, + "hadoopMergeSpecs": [ + { + "dataSource": "wikipedia", + "queryGranularity": "DAY", + "dimensions": ["language"], + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + } + ] + } + ] + } } ``` @@ -87,7 +120,8 @@ Issuing a GET request at the same URL will return the spec that is currently in |`replicantLifetime`|The maximum number of coordinator runs for a segment to be replicated before we start alerting.|15| |`replicationThrottleLimit`|The maximum number of segments that can be replicated at one time.|10| |`emitBalancingStats`|Boolean flag for whether or not we should emit balancing stats. This is an expensive operation.|false| -|`killDataSourceWhitelist`|List of dataSources for which kill tasks are sent if property `druid.coordinator.kill.on` is true.|none| +|`killDataSourceWhitelist`|List of dataSources for which kill tasks are sent if property `druid.coordinator.kill.on` is true.|null| +|`hadoopMergeConfig`|Used by Hadoop segments automatic merging strategy. It contains information about how to finds imbalanced segments, and how do to Hadoop reindexing|null| To view the audit history of coordinator dynamic config issue a GET request to the URL - diff --git a/docs/content/ingestion/merge-segments.md b/docs/content/ingestion/merge-segments.md new file mode 100644 index 000000000000..bac2b58f6568 --- /dev/null +++ b/docs/content/ingestion/merge-segments.md @@ -0,0 +1,93 @@ +--- +layout: doc_page +--- +# Merge Existing Segments + +Druid can automatically merge small segments into a single segment that has a more optimal segment size. You can turn that on by specifying a merge +strategy at `druid.coordinator.merge.strategy` (See [Coordinator Configuration](../configuration/coordinator.html)). +Currently there are two merge strategies you can choose for Druid. + +#### `append` +This simply sends an [Append Task](../ingestion/tasks.html) with segments that need to be merged. To help Druid decide what segments need to be merged, +user can set appropriate dynamic configurations values for `mergeBytesLimit` and `mergeSegmentsLimit` (See [Coordinator Dynamic Configuration]("../configuration/coordinator.html#dynamic-configuration").). +Note that Append task will only merge segments that have a single shard. + + + +#### `hadoop` +This sends a Hadoop Reindex Task to actually reindex the intervals covered by segments that have imbalanced sizes. Behind the scene, Coordinator will periodically +find segments whose sizes are small. Once it finds enough such segments whose current total size is greater than the threshold specified by `mergeBytesLimit` +(See [Coordinator Dynamic Configuration]("../configuration/coordinator.html#dynamic-configuration")), Coordinator will submit a Hadoop Reindex Task to reindex +those small segments. + +To enable this, user will need to post +a `hadoopMergeConfig`, along with other dynamic configurations to [Coordinator Dynamic Configuration end point]("../configuration/coordinator.html#dynamic-configuration"). +Optionally, user can also configure it in Coordinator console. + +`hadoopMergeConfig` provides information about how Coordinator finds imbalanced segments, and how do to Hadoop reindexing. + +Here is what goes inside `hadoopMergeConfig`, + +|Field|Type|Description|Default|Required| +|-----|----|-----------|-------|--------| +|keepGap|Boolean.|Indicate whether Druid should merge segments whose intervals are non-contiguous. For example, segment A has interval `2016-03-22/2016-03-23`, segment B has interval `2016-03-24/2016-03-25`. If `keepGap` is true, Druid will not merge A and B.|false|no| +|hadoopDependencyCoordinates|Array of String.|A list of Hadoop dependency coordinates that Druid will use, this property will override the default Hadoop coordinates. Once specified, Druid will look for those Hadoop dependencies from the location specified by `druid.extensions.hadoopDependenciesDir`|null|no| +|tuningConfig|JSON Object.|This is exactly same as the tuningConfig specified in Hadoop Index Task. See [TuningConfig](../ingestion/batch-ingestion.html#tuningconfig).|null|no| +|hadoopMergeSpecs|Array of HadoopMergeSpec.|A list of HadoopMergeSpec. Each data source will have its own HadoopMergeSpec. See below.|null|no| + +For each data source on which you want to enable automatic merge, you would need to provide a `HadoopMergeSpec` so that Druid knows how to reindex those segments. +Here is what goes inside `HadoopMergeSpec` + +|Field|Type|Description|Default|Required| +|-----|----|-----------|-------|--------| +|dataSource|String.|The data source of segments that will be automatically merged.|Value has to be provided by user.|yes| +|queryGranularity|String.|Defines the granularity of the query while loading data. See [Granularities](../querying/granularities.html).|NONE|no| +|dimensions|Array of String.|Name of dimension columns to load.|The super set of dimensions in the existing segments.|no| +|metricsSpec|Array of aggregators.|A list of [aggregators](../querying/aggregations.html).|Value has to be provided by user.|yes| + +Example: + +```json +{ + ... other Coordinator dynamic configs... + "mergeBytesLimit": 500000000, + "hadoopMergeConfig": { + "keepGap": true, + "hadoopDependencyCoordinates": null, + "tuningConfig": null, + "hadoopMergeSpecs": [ + { + "dataSource": "wikipedia", + "queryGranularity": "DAY", + "dimensions": ["language"], + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + } + ] + } + ] + } +} +``` + +With this configuration posted to [Coordinator Dynamic Configuration end point]("../configuration/coordinator.html#dynamic-configuration"), +Coordinator will automatically merge small segments whose data sources are "wikipedia". Once it finds enough small segments whose total +uncompressed size is greater or equal than `mergeBytesLimit`, it will submit a Hadoop Index Task to reindex intervals +covered by those imbalanced segments, using the dimensions specified in `dimensions` (if not specified, it will use the `dimensions` in the existing segments) and aggregators specified in `metricsSpec`. diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIOConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIOConfig.java index 24d40b9ad672..fcb5542e12be 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIOConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopIOConfig.java @@ -70,4 +70,39 @@ public HadoopIOConfig withSegmentOutputPath(String path) { return new HadoopIOConfig(pathSpec, metadataUpdateSpec, path); } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + HadoopIOConfig ioConfig = (HadoopIOConfig) o; + + if (pathSpec != null ? !pathSpec.equals(ioConfig.pathSpec) : ioConfig.pathSpec != null) { + return false; + } + if (metadataUpdateSpec != null + ? !metadataUpdateSpec.equals(ioConfig.metadataUpdateSpec) + : ioConfig.metadataUpdateSpec != null) { + return false; + } + return segmentOutputPath != null + ? segmentOutputPath.equals(ioConfig.segmentOutputPath) + : ioConfig.segmentOutputPath == null; + + } + + @Override + public int hashCode() + { + int result = pathSpec != null ? pathSpec.hashCode() : 0; + result = 31 * result + (metadataUpdateSpec != null ? metadataUpdateSpec.hashCode() : 0); + result = 31 * result + (segmentOutputPath != null ? segmentOutputPath.hashCode() : 0); + return result; + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java index 8a008ed2f7a9..b61873f4b8b8 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java @@ -279,4 +279,75 @@ public HadoopTuningConfig withShardSpecs(Map> s numBackgroundPersistThreads ); } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + HadoopTuningConfig that = (HadoopTuningConfig) o; + + if (rowFlushBoundary != that.rowFlushBoundary) { + return false; + } + if (leaveIntermediate != that.leaveIntermediate) { + return false; + } + if (overwriteFiles != that.overwriteFiles) { + return false; + } + if (ignoreInvalidRows != that.ignoreInvalidRows) { + return false; + } + if (combineText != that.combineText) { + return false; + } + if (useCombiner != that.useCombiner) { + return false; + } + if (workingPath != null ? !workingPath.equals(that.workingPath) : that.workingPath != null) { + return false; + } + if (version != null ? !version.equals(that.version) : that.version != null) { + return false; + } + if (partitionsSpec != null ? !partitionsSpec.equals(that.partitionsSpec) : that.partitionsSpec != null) { + return false; + } + if (shardSpecs != null ? !shardSpecs.equals(that.shardSpecs) : that.shardSpecs != null) { + return false; + } + if (indexSpec != null ? !indexSpec.equals(that.indexSpec) : that.indexSpec != null) { + return false; + } + if (cleanupOnFailure != null ? !cleanupOnFailure.equals(that.cleanupOnFailure) : that.cleanupOnFailure != null) { + return false; + } + return !(jobProperties != null ? !jobProperties.equals(that.jobProperties) : that.jobProperties != null); + + } + + @Override + public int hashCode() + { + int result = workingPath != null ? workingPath.hashCode() : 0; + result = 31 * result + (version != null ? version.hashCode() : 0); + result = 31 * result + (partitionsSpec != null ? partitionsSpec.hashCode() : 0); + result = 31 * result + (shardSpecs != null ? shardSpecs.hashCode() : 0); + result = 31 * result + (indexSpec != null ? indexSpec.hashCode() : 0); + result = 31 * result + rowFlushBoundary; + result = 31 * result + (leaveIntermediate ? 1 : 0); + result = 31 * result + (cleanupOnFailure != null ? cleanupOnFailure.hashCode() : 0); + result = 31 * result + (overwriteFiles ? 1 : 0); + result = 31 * result + (ignoreInvalidRows ? 1 : 0); + result = 31 * result + (jobProperties != null ? jobProperties.hashCode() : 0); + result = 31 * result + (combineText ? 1 : 0); + result = 31 * result + (useCombiner ? 1 : 0); + return result; + } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java index 09d20c6524b6..ea02b2beccd2 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/DatasourceInputFormat.java @@ -146,7 +146,8 @@ public org.apache.hadoop.mapred.InputFormat get() //and not consider the splitting. //also without this, isSplitable(..) fails with NPE because compressionCodecs is not properly setup. @Override - protected boolean isSplitable(FileSystem fs, Path file) { + protected boolean isSplitable(FileSystem fs, Path file) + { return false; } }; diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java index 4063f332f12e..d6df3fcab575 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/path/DatasourcePathSpec.java @@ -67,7 +67,7 @@ public DatasourcePathSpec( if (maxSplitSize == null) { this.maxSplitSize = 0; } else { - this.maxSplitSize = maxSplitSize.longValue(); + this.maxSplitSize = maxSplitSize; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 28f177f602c8..3dcf7c52e21c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -79,7 +79,6 @@ private static String getTheDataSource(HadoopIngestionSpec spec) * IndexGeneratorJob.getPublishedSegments() to simply return a list of the published * segments, and let the indexing service report these segments to the database. */ - @JsonCreator public HadoopIndexTask( @JsonProperty("id") String id, diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index 706036e5e6f2..c4b6e6090416 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -53,6 +53,7 @@ import org.joda.time.DateTime; import org.joda.time.Interval; +import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; @@ -377,6 +378,33 @@ public Collection apply(TaskRunner taskRunner) ); } + @GET + @Path("/incompleteTasks") + @Produces(MediaType.APPLICATION_JSON) + public Response getIncompleteTasks() + { + final List incompleteTasks = Lists.transform( + taskStorageQueryAdapter.getActiveTasks(), + new Function() + { + @Nullable + @Override + public TaskResponseObject apply(Task task) + { + return new TaskResponseObject( + task.getId(), + new DateTime(0), + new DateTime(0), + Optional.absent(), + TaskLocation.unknown() + ); + } + } + ); + + return Response.ok(incompleteTasks).build(); + } + @GET @Path("/completeTasks") @Produces(MediaType.APPLICATION_JSON) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index abc6fc8b4d92..d82304fb69ba 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -21,16 +21,21 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.metamx.common.Granularity; -import io.druid.client.indexing.ClientAppendQuery; -import io.druid.client.indexing.ClientKillQuery; -import io.druid.client.indexing.ClientMergeQuery; +import io.druid.client.indexing.ClientAppendTask; +import io.druid.client.indexing.ClientHadoopIndexTask; +import io.druid.client.indexing.ClientKillTask; +import io.druid.client.indexing.ClientMergeTask; import io.druid.granularity.QueryGranularity; import io.druid.guice.FirehoseModule; import io.druid.indexer.HadoopIOConfig; import io.druid.indexer.HadoopIngestionSpec; +import io.druid.indexer.HadoopTuningConfig; import io.druid.indexing.common.TestUtils; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; @@ -40,6 +45,7 @@ import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.granularity.GranularitySpec; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.realtime.FireDepartment; import io.druid.segment.realtime.FireDepartmentMetrics; @@ -53,6 +59,7 @@ import org.junit.Assert; import org.junit.Test; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.List; @@ -198,7 +205,7 @@ public void testMergeTaskSerde() throws Exception task2.getAggregators().get(0).getName() ); - final MergeTask task3 = (MergeTask) jsonMapper.readValue(jsonMapper.writeValueAsString(new ClientMergeQuery( + final MergeTask task3 = (MergeTask) jsonMapper.readValue(jsonMapper.writeValueAsString(new ClientMergeTask( "foo", segments, aggregators @@ -233,7 +240,7 @@ public void testKillTaskSerde() throws Exception Assert.assertEquals(task.getDataSource(), task2.getDataSource()); Assert.assertEquals(task.getInterval(), task2.getInterval()); - final KillTask task3 = (KillTask) jsonMapper.readValue(jsonMapper.writeValueAsString(new ClientKillQuery( + final KillTask task3 = (KillTask) jsonMapper.readValue(jsonMapper.writeValueAsString(new ClientKillTask( "foo", new Interval("2010-01-01/P1D") )), Task.class); @@ -417,7 +424,7 @@ public void testAppendTaskSerde() throws Exception Assert.assertEquals(task.getInterval(), task2.getInterval()); Assert.assertEquals(task.getSegments(), task2.getSegments()); - final AppendTask task3 = (AppendTask) jsonMapper.readValue(jsonMapper.writeValueAsString(new ClientAppendQuery( + final AppendTask task3 = (AppendTask) jsonMapper.readValue(jsonMapper.writeValueAsString(new ClientAppendTask( "foo", segments )), Task.class); @@ -613,4 +620,122 @@ public void testHadoopIndexTaskSerde() throws Exception Assert.assertEquals("blah", task.getClasspathPrefix()); Assert.assertEquals("blah", task2.getClasspathPrefix()); } + + @Test + public void testHadoopIndexTaskSerdeWithClientHadoopQuery() throws Exception + { + final String expectedTaskId = "coordinator_index_task_1234"; + final String expectedDataSource = "wikipedia"; + + final List expectedIntervalList = ImmutableList.of( + new Interval("2012-01-01/2012-01-03"), + new Interval("2012-01-05/2012-01-08"), + new Interval("2012-01-10/2012-01-14") + ); + + final List expectedDimensions = ImmutableList.of("a", "b", "c"); + + final AggregatorFactory[] expectedAggregators = new AggregatorFactory[]{ + new CountAggregatorFactory("name"), + new DoubleSumAggregatorFactory( + "added", + "added" + ), + new DoubleSumAggregatorFactory( + "deleted", + "deleted" + ), + new DoubleSumAggregatorFactory( + "delta", + "delta" + ) + }; + + final HadoopTuningConfig expectedTuningConfig = jsonMapper.convertValue(ImmutableMap.builder() + .put("version", "v1") + .put("type", "hadoop") + .put( + "partitionSpec", + ImmutableMap.of( + "assumeGrouped", + true, + "targetPartitionSize", + 1000, + "type", + "hashed" + ) + ) + .put("rowFlushBoundary", 10000) + .build(), HadoopTuningConfig.class); + + final ClientHadoopIndexTask clientHadoopIndexTask = new ClientHadoopIndexTask( + expectedTaskId, + expectedDataSource, + expectedIntervalList, + expectedAggregators, + expectedDimensions, + QueryGranularity.DAY, + ImmutableMap.builder() + .put("version", "v1") + .put("type", "hadoop") + .put( + "partitionSpec", + ImmutableMap.of( + "assumeGrouped", + true, + "targetPartitionSize", + 1000, + "type", + "hashed" + ) + ) + .put("rowFlushBoundary", 10000) + .build(), + null, + jsonMapper + ); + + final HadoopIOConfig expectedIOConfig = new HadoopIOConfig(ImmutableMap.of( + "type", + "dataSource", + "ingestionSpec", + // Due to that HadoopIOConfig takes a Map instead of a PathSpec in its JsonCreate constructor, + // the list of intervals has to be in string format here. + ImmutableMap.of( + "dataSource", expectedDataSource, + "intervals", Lists.transform( + expectedIntervalList, + new Function() + { + @Nullable + @Override + public String apply(@Nullable Interval input) + { + return input.toString(); + } + }), + "dimensions", expectedDimensions + ) + ), null, null); + + final HadoopIndexTask hadoopIndexTask = jsonMapper.readValue( + jsonMapper.writeValueAsString(clientHadoopIndexTask), + HadoopIndexTask.class + ); + + final HadoopIngestionSpec actualIngestionSpec = hadoopIndexTask.getSpec(); + final DataSchema actualDataSchema = actualIngestionSpec.getDataSchema(); + final GranularitySpec actualGranularitySpec = actualIngestionSpec.getDataSchema().getGranularitySpec(); + final HadoopTuningConfig actualTuningConfig = hadoopIndexTask.getSpec().getTuningConfig(); + final HadoopIOConfig actualIOConfig = hadoopIndexTask.getSpec().getIOConfig(); + + Assert.assertEquals(expectedDataSource, hadoopIndexTask.getDataSource()); + Assert.assertEquals(expectedDataSource, actualDataSchema.getDataSource()); + Assert.assertEquals(expectedTaskId, hadoopIndexTask.getId()); + Assert.assertEquals(QueryGranularity.DAY, actualGranularitySpec.getQueryGranularity()); + Assert.assertEquals(Sets.newHashSet(expectedIntervalList), actualGranularitySpec.bucketIntervals().get()); + Assert.assertArrayEquals(expectedAggregators, actualDataSchema.getAggregators()); + Assert.assertEquals(expectedTuningConfig, actualTuningConfig); + Assert.assertEquals(expectedIOConfig, actualIOConfig); + } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java index 5ef4fd3c8c03..c41f9c934996 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordResourceTest.java @@ -223,11 +223,17 @@ public void testOverlordResource() throws Exception ((TaskStatus) ((Map) response.getEntity()).get("status")).getStatusCode() ); + response = overlordResource.getIncompleteTasks(); + Assert.assertEquals(1, ((List) response.getEntity()).size()); + // Simulate completion of task_0 taskCompletionCountDownLatches[Integer.parseInt(taskId_0)].countDown(); // Wait for taskQueue to handle success status of task_0 waitForTaskStatus(taskId_0, TaskStatus.Status.SUCCESS); + response = overlordResource.getIncompleteTasks(); + Assert.assertEquals(0, ((List) response.getEntity()).size()); + // Manually insert task in taskStorage // Verifies sync from storage final String taskId_1 = "1"; @@ -244,6 +250,9 @@ public void testOverlordResource() throws Exception Assert.assertEquals(taskId_1, taskResponseObject.toJson().get("id")); Assert.assertEquals(TASK_LOCATION, taskResponseObject.toJson().get("location")); + response = overlordResource.getIncompleteTasks(); + Assert.assertEquals(1, ((List) response.getEntity()).size()); + // Simulate completion of task_1 taskCompletionCountDownLatches[Integer.parseInt(taskId_1)].countDown(); // Wait for taskQueue to handle success status of task_1 @@ -252,6 +261,10 @@ public void testOverlordResource() throws Exception // should return number of tasks which are not in running state response = overlordResource.getCompleteTasks(); Assert.assertEquals(2, (((List) response.getEntity()).size())); + + response = overlordResource.getIncompleteTasks(); + Assert.assertEquals(0, ((List) response.getEntity()).size()); + taskMaster.stop(); Assert.assertFalse(taskMaster.isLeading()); EasyMock.verify(taskLockbox, taskActionClientFactory); diff --git a/server/src/main/java/io/druid/client/indexing/ClientAppendQuery.java b/server/src/main/java/io/druid/client/indexing/ClientAppendTask.java similarity index 96% rename from server/src/main/java/io/druid/client/indexing/ClientAppendQuery.java rename to server/src/main/java/io/druid/client/indexing/ClientAppendTask.java index 4264bacd2f71..cfa750e2ee18 100644 --- a/server/src/main/java/io/druid/client/indexing/ClientAppendQuery.java +++ b/server/src/main/java/io/druid/client/indexing/ClientAppendTask.java @@ -27,13 +27,13 @@ /** */ -public class ClientAppendQuery +public class ClientAppendTask { private final String dataSource; private final List segments; @JsonCreator - public ClientAppendQuery( + public ClientAppendTask( @JsonProperty("dataSource") String dataSource, @JsonProperty("segments") List segments ) diff --git a/server/src/main/java/io/druid/client/indexing/ClientConversionQuery.java b/server/src/main/java/io/druid/client/indexing/ClientConversionTask.java similarity index 94% rename from server/src/main/java/io/druid/client/indexing/ClientConversionQuery.java rename to server/src/main/java/io/druid/client/indexing/ClientConversionTask.java index 1fd27da70c55..967411185ce0 100644 --- a/server/src/main/java/io/druid/client/indexing/ClientConversionQuery.java +++ b/server/src/main/java/io/druid/client/indexing/ClientConversionTask.java @@ -25,13 +25,13 @@ /** */ -public class ClientConversionQuery +public class ClientConversionTask { private final String dataSource; private final Interval interval; private final DataSegment segment; - public ClientConversionQuery( + public ClientConversionTask( DataSegment segment ) { @@ -40,7 +40,7 @@ public ClientConversionQuery( this.segment = segment; } - public ClientConversionQuery( + public ClientConversionTask( String dataSource, Interval interval ) diff --git a/server/src/main/java/io/druid/client/indexing/ClientHadoopIOConfig.java b/server/src/main/java/io/druid/client/indexing/ClientHadoopIOConfig.java new file mode 100644 index 000000000000..b3aa3affe03c --- /dev/null +++ b/server/src/main/java/io/druid/client/indexing/ClientHadoopIOConfig.java @@ -0,0 +1,50 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.indexing; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.segment.indexing.IOConfig; + +import java.util.Map; + +/** + */ +public class ClientHadoopIOConfig +{ + private final Map pathSpec; + + public ClientHadoopIOConfig(final @JsonProperty("inputSpec") Map pathSpec) + { + this.pathSpec = pathSpec; + } + + @JsonProperty + public String getType() + { + return "hadoop"; + } + + @JsonProperty("inputSpec") + public Map getPathSpec() + { + return pathSpec; + } + +} diff --git a/server/src/main/java/io/druid/client/indexing/ClientHadoopIndexTask.java b/server/src/main/java/io/druid/client/indexing/ClientHadoopIndexTask.java new file mode 100644 index 000000000000..333c1dc77d85 --- /dev/null +++ b/server/src/main/java/io/druid/client/indexing/ClientHadoopIndexTask.java @@ -0,0 +1,117 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.indexing; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; +import io.druid.data.input.impl.NoopInputRowParser; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec; +import org.joda.time.Interval; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + */ +public class ClientHadoopIndexTask +{ + private final String id; + private final List hadoopDependencyCoordinates; + private final ClientHadoopIngestionSpec hadoopIngestionSpec; + + public ClientHadoopIndexTask( + String id, + String dataSource, + List intervalsToReindex, + AggregatorFactory[] aggregators, + List dimensions, + QueryGranularity queryGranularity, + Map tuningConfig, + List hadoopDependencyCoordinates, + ObjectMapper mapper + ) + { + this.id = id; + this.hadoopDependencyCoordinates = hadoopDependencyCoordinates; + + final Map datasourceIngestionSpec = new HashMap<>(); + datasourceIngestionSpec.put("dataSource", dataSource); + datasourceIngestionSpec.put("intervals", intervalsToReindex); + if (dimensions != null) { + datasourceIngestionSpec.put("dimensions", dimensions); + } + + this.hadoopIngestionSpec = new ClientHadoopIngestionSpec( + new DataSchema( + dataSource, + mapper.>convertValue( + new NoopInputRowParser(null), + new TypeReference>() + { + } + ), + aggregators, + new ArbitraryGranularitySpec( + queryGranularity == null ? QueryGranularity.NONE : queryGranularity, + intervalsToReindex + ), + mapper + ), + new ClientHadoopIOConfig(ImmutableMap.of( + "type", + "dataSource", + "ingestionSpec", + datasourceIngestionSpec + )), + tuningConfig + ); + } + + @JsonProperty + public String getType() + { + return "index_hadoop"; + } + + @JsonProperty + public String getId() + { + return id; + } + + @JsonProperty + public List getHadoopDependencyCoordinates() + { + return hadoopDependencyCoordinates; + } + + @JsonProperty("spec") + public ClientHadoopIngestionSpec getHadoopIngestionSpec() + { + return hadoopIngestionSpec; + } + +} diff --git a/server/src/main/java/io/druid/client/indexing/ClientHadoopIngestionSpec.java b/server/src/main/java/io/druid/client/indexing/ClientHadoopIngestionSpec.java new file mode 100644 index 000000000000..f8c18a01b0ff --- /dev/null +++ b/server/src/main/java/io/druid/client/indexing/ClientHadoopIngestionSpec.java @@ -0,0 +1,66 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.indexing; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.segment.indexing.DataSchema; + +import java.util.Map; + +/** + */ +public class ClientHadoopIngestionSpec +{ + + private final DataSchema dataSchema; + private final ClientHadoopIOConfig ioConfig; + private final Map tuningConfig; + + public ClientHadoopIngestionSpec( + @JsonProperty("dataSchema") DataSchema dataSchema, + @JsonProperty("ioConfig") ClientHadoopIOConfig ioConfig, + @JsonProperty("tuningConfig") Map tuningConfig + ) + { + + this.dataSchema = dataSchema; + this.ioConfig = ioConfig; + this.tuningConfig = tuningConfig; + } + + @JsonProperty("dataSchema") + public DataSchema getDataSchema() + { + return dataSchema; + } + + @JsonProperty("ioConfig") + public ClientHadoopIOConfig getIOConfig() + { + return ioConfig; + } + + @JsonProperty("tuningConfig") + public Map getTuningConfig() + { + return tuningConfig; + } + +} diff --git a/server/src/main/java/io/druid/client/indexing/ClientKillQuery.java b/server/src/main/java/io/druid/client/indexing/ClientKillTask.java similarity index 96% rename from server/src/main/java/io/druid/client/indexing/ClientKillQuery.java rename to server/src/main/java/io/druid/client/indexing/ClientKillTask.java index 07fdbf5696f6..8405a4215944 100644 --- a/server/src/main/java/io/druid/client/indexing/ClientKillQuery.java +++ b/server/src/main/java/io/druid/client/indexing/ClientKillTask.java @@ -25,13 +25,13 @@ /** */ -public class ClientKillQuery +public class ClientKillTask { private final String dataSource; private final Interval interval; @JsonCreator - public ClientKillQuery( + public ClientKillTask( @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval ) diff --git a/server/src/main/java/io/druid/client/indexing/ClientMergeQuery.java b/server/src/main/java/io/druid/client/indexing/ClientMergeTask.java similarity index 97% rename from server/src/main/java/io/druid/client/indexing/ClientMergeQuery.java rename to server/src/main/java/io/druid/client/indexing/ClientMergeTask.java index fab3f459a9d5..deee774157c1 100644 --- a/server/src/main/java/io/druid/client/indexing/ClientMergeQuery.java +++ b/server/src/main/java/io/druid/client/indexing/ClientMergeTask.java @@ -28,14 +28,14 @@ /** */ -public class ClientMergeQuery +public class ClientMergeTask { private final String dataSource; private final List segments; private final List aggregators; @JsonCreator - public ClientMergeQuery( + public ClientMergeTask( @JsonProperty("dataSource") String dataSource, @JsonProperty("segments") List segments, @JsonProperty("aggregations") List aggregators diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java index 0e1e01e15812..4866b74b900a 100644 --- a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java @@ -19,31 +19,41 @@ package io.druid.client.indexing; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; import com.google.inject.Inject; import com.metamx.common.IAE; import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; import com.metamx.http.client.HttpClient; import com.metamx.http.client.Request; import com.metamx.http.client.response.InputStreamResponseHandler; import io.druid.client.selector.Server; import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.granularity.QueryGranularity; import io.druid.guice.annotations.Global; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.server.coordinator.helper.DruidCoordinatorHadoopSegmentMerger; import io.druid.timeline.DataSegment; import org.jboss.netty.handler.codec.http.HttpMethod; +import org.joda.time.DateTime; import org.joda.time.Interval; import javax.ws.rs.core.MediaType; +import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.net.URL; import java.util.Iterator; import java.util.List; +import java.util.Map; public class IndexingServiceClient { + private static final Logger log = new Logger(IndexingServiceClient.class); private static final InputStreamResponseHandler RESPONSE_HANDLER = new InputStreamResponseHandler(); + private final static String INCOMPLETE_TASKS = "incompleteTasks"; private final HttpClient client; private final ObjectMapper jsonMapper; @@ -76,25 +86,100 @@ public void mergeSegments(List segments) } } - runQuery(new ClientAppendQuery(dataSource, segments)); + postTask(new ClientAppendTask(dataSource, segments)); + } + + public String hadoopMergeSegments( + String dataSource, + List intervalsToReindex, + AggregatorFactory[] aggregators, + QueryGranularity queryGranularity, + List dimensions, + Map tuningConfig, + List hadoopCoordinates + ) + { + final InputStream queryResponse = postTask( + new ClientHadoopIndexTask( + String.format( + "%s_%s_%s", + DruidCoordinatorHadoopSegmentMerger.HADOOP_REINDEX_TASK_ID_PREFIX, dataSource, new DateTime() + ), + dataSource, + intervalsToReindex, + aggregators, + dimensions, + queryGranularity, + tuningConfig, + hadoopCoordinates, + jsonMapper + ) + ); + + try { + return jsonMapper.>readValue( + queryResponse, new TypeReference>() + { + } + ).get("task"); + } + catch (IOException e) { + log.warn( + "Error pasring HadoopReindexTask ID for dataSource [%s] and Interval [%s]", + dataSource, + intervalsToReindex + ); + return null; + } } public void killSegments(String dataSource, Interval interval) { - runQuery(new ClientKillQuery(dataSource, interval)); + postTask(new ClientKillTask(dataSource, interval)); } public void upgradeSegment(DataSegment dataSegment) { - runQuery(new ClientConversionQuery(dataSegment)); + postTask(new ClientConversionTask(dataSegment)); } public void upgradeSegments(String dataSource, Interval interval) { - runQuery(new ClientConversionQuery(dataSource, interval)); + postTask(new ClientConversionTask(dataSource, interval)); } - private InputStream runQuery(Object queryObject) + public List> getIncompleteTasks() + { + final InputStream queryResponse = runGetQuery(INCOMPLETE_TASKS); + try { + return jsonMapper.readValue( + queryResponse, new TypeReference>>() + { + } + ); + } + catch (IOException e) { + log.warn(e, "Error parsing response for incompleteTasks query from Overlord [%s]", baseUrl()); + throw Throwables.propagate(e); + } + } + + private InputStream runGetQuery(String path) + { + try { + return client.go( + new Request( + HttpMethod.GET, + new URL(String.format("%s/%s", baseUrl(), path)) + ), RESPONSE_HANDLER + ).get(); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + + private InputStream postTask(Object queryObject) { try { return client.go( @@ -132,4 +217,5 @@ private String baseUrl() throw Throwables.propagate(e); } } + } diff --git a/server/src/main/java/io/druid/segment/indexing/DataSchema.java b/server/src/main/java/io/druid/segment/indexing/DataSchema.java index 1f62dc151564..f665f4cf1ba7 100644 --- a/server/src/main/java/io/druid/segment/indexing/DataSchema.java +++ b/server/src/main/java/io/druid/segment/indexing/DataSchema.java @@ -61,14 +61,14 @@ public DataSchema( @JacksonInject ObjectMapper jsonMapper ) { - this.jsonMapper = Preconditions.checkNotNull(jsonMapper, "null ObjectMapper."); this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource cannot be null. Please provide a dataSource."); + this.jsonMapper = jsonMapper; this.parser = parser; - if (aggregators.length == 0) { + this.aggregators = aggregators == null ? new AggregatorFactory[]{} : aggregators; + if (this.aggregators.length == 0) { log.warn("No metricsSpec has been specified. Are you sure this is what you want?"); } - this.aggregators = aggregators; if (granularitySpec == null) { log.warn("No granularitySpec has been specified. Using UniformGranularitySpec as default."); @@ -93,7 +93,7 @@ public Map getParserMap() @JsonIgnore public InputRowParser getParser() { - if(parser == null) { + if (parser == null) { log.warn("No parser has been specified"); return null; } diff --git a/server/src/main/java/io/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/io/druid/server/coordinator/CoordinatorDynamicConfig.java index 62f07d8474d5..373700485466 100644 --- a/server/src/main/java/io/druid/server/coordinator/CoordinatorDynamicConfig.java +++ b/server/src/main/java/io/druid/server/coordinator/CoordinatorDynamicConfig.java @@ -36,6 +36,7 @@ public class CoordinatorDynamicConfig private final int balancerComputeThreads; private final boolean emitBalancingStats; private final Set killDataSourceWhitelist; + private final DruidCoordinatorHadoopMergeConfig hadoopMergeConfig; @JsonCreator public CoordinatorDynamicConfig( @@ -47,7 +48,8 @@ public CoordinatorDynamicConfig( @JsonProperty("replicationThrottleLimit") int replicationThrottleLimit, @JsonProperty("balancerComputeThreads") int balancerComputeThreads, @JsonProperty("emitBalancingStats") boolean emitBalancingStats, - @JsonProperty("killDataSourceWhitelist") Set killDataSourceWhitelist + @JsonProperty("killDataSourceWhitelist") Set killDataSourceWhitelist, + @JsonProperty("hadoopMergeConfig") DruidCoordinatorHadoopMergeConfig hadoopMergeConfig ) { this.maxSegmentsToMove = maxSegmentsToMove; @@ -57,6 +59,7 @@ public CoordinatorDynamicConfig( this.replicantLifetime = replicantLifetime; this.replicationThrottleLimit = replicationThrottleLimit; this.emitBalancingStats = emitBalancingStats; + this.hadoopMergeConfig = hadoopMergeConfig; this.balancerComputeThreads = Math.min( Math.max(balancerComputeThreads, 1), Math.max(Runtime.getRuntime().availableProcessors() - 1, 1) @@ -118,6 +121,12 @@ public Set getKillDataSourceWhitelist() return killDataSourceWhitelist; } + @JsonProperty + public DruidCoordinatorHadoopMergeConfig getHadoopMergeConfig() + { + return hadoopMergeConfig; + } + @Override public String toString() { @@ -131,6 +140,7 @@ public String toString() ", balancerComputeThreads=" + balancerComputeThreads + ", emitBalancingStats=" + emitBalancingStats + ", killDataSourceWhitelist=" + killDataSourceWhitelist + + ", hadoopMergeConfig=" + hadoopMergeConfig + '}'; } @@ -170,9 +180,14 @@ public boolean equals(Object o) if (emitBalancingStats != that.emitBalancingStats) { return false; } - return !(killDataSourceWhitelist != null - ? !killDataSourceWhitelist.equals(that.killDataSourceWhitelist) - : that.killDataSourceWhitelist != null); + if (killDataSourceWhitelist != null + ? !killDataSourceWhitelist.equals(that.killDataSourceWhitelist) + : that.killDataSourceWhitelist != null) { + return false; + } + return hadoopMergeConfig != null + ? hadoopMergeConfig.equals(that.hadoopMergeConfig) + : that.hadoopMergeConfig == null; } @@ -188,6 +203,7 @@ public int hashCode() result = 31 * result + balancerComputeThreads; result = 31 * result + (emitBalancingStats ? 1 : 0); result = 31 * result + (killDataSourceWhitelist != null ? killDataSourceWhitelist.hashCode() : 0); + result = 31 * result + (hadoopMergeConfig != null ? hadoopMergeConfig.hashCode() : 0); return result; } @@ -202,10 +218,11 @@ public static class Builder private boolean emitBalancingStats; private int balancerComputeThreads; private Set killDataSourceWhitelist; + private DruidCoordinatorHadoopMergeConfig hadoopMergeConfig; public Builder() { - this(15 * 60 * 1000L, 524288000L, 100, 5, 15, 10, 1, false, null); + this(15 * 60 * 1000L, 524288000L, 100, 5, 15, 10, 1, false, null, null); } private Builder( @@ -217,7 +234,8 @@ private Builder( int replicationThrottleLimit, int balancerComputeThreads, boolean emitBalancingStats, - Set killDataSourceWhitelist + Set killDataSourceWhitelist, + DruidCoordinatorHadoopMergeConfig hadoopMergeConfig ) { this.millisToWaitBeforeDeleting = millisToWaitBeforeDeleting; @@ -229,6 +247,7 @@ private Builder( this.emitBalancingStats = emitBalancingStats; this.balancerComputeThreads = balancerComputeThreads; this.killDataSourceWhitelist = killDataSourceWhitelist; + this.hadoopMergeConfig = hadoopMergeConfig; } public Builder withMillisToWaitBeforeDeleting(long millisToWaitBeforeDeleting) @@ -279,6 +298,12 @@ public Builder withKillDataSourceWhitelist(Set killDataSourceWhitelist) return this; } + public Builder withhadoopMergeConfig(DruidCoordinatorHadoopMergeConfig hadoopMergeConfig) + { + this.hadoopMergeConfig = hadoopMergeConfig; + return this; + } + public CoordinatorDynamicConfig build() { return new CoordinatorDynamicConfig( @@ -290,7 +315,8 @@ public CoordinatorDynamicConfig build() replicationThrottleLimit, balancerComputeThreads, emitBalancingStats, - killDataSourceWhitelist + killDataSourceWhitelist, + hadoopMergeConfig ); } } diff --git a/server/src/main/java/io/druid/server/coordinator/CoordinatorHadoopMergeSpec.java b/server/src/main/java/io/druid/server/coordinator/CoordinatorHadoopMergeSpec.java new file mode 100644 index 000000000000..33dace7a0410 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/CoordinatorHadoopMergeSpec.java @@ -0,0 +1,122 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import com.google.common.base.Preconditions; +import java.util.Arrays; +import java.util.List; + +/** + */ +public class CoordinatorHadoopMergeSpec +{ + + private final String dataSource; + private final QueryGranularity queryGranularity; + private final List dimensions; + private final AggregatorFactory[] metricsSpec; + + public CoordinatorHadoopMergeSpec( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("queryGranularity") QueryGranularity queryGranularity, + @JsonProperty("dimensions") List dimensions, + @JsonProperty("metricsSpec") AggregatorFactory[] metricsSpec + ) + { + this.dataSource = Preconditions.checkNotNull(dataSource); + this.queryGranularity = queryGranularity; + this.dimensions = dimensions; + this.metricsSpec = metricsSpec; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public QueryGranularity getQueryGranularity() + { + return queryGranularity; + } + + @JsonProperty + public List getDimensions() + { + return dimensions; + } + + @JsonProperty + public AggregatorFactory[] getMetricsSpec() + { + return metricsSpec; + } + + @Override + public String toString() + { + return "CoordinatorHadoopMergeSpec{" + + "dataSource='" + dataSource + '\'' + + ", queryGranularity=" + queryGranularity + + ", dimensions=" + dimensions + + ", metricsSpec=" + Arrays.toString(metricsSpec) + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + CoordinatorHadoopMergeSpec that = (CoordinatorHadoopMergeSpec) o; + + if (dataSource != null ? !dataSource.equals(that.dataSource) : that.dataSource != null) { + return false; + } + if (queryGranularity != null ? !queryGranularity.equals(that.queryGranularity) : that.queryGranularity != null) { + return false; + } + if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) { + return false; + } + // Probably incorrect - comparing Object[] arrays with Arrays.equals + return Arrays.equals(metricsSpec, that.metricsSpec); + + } + + @Override + public int hashCode() + { + int result = dataSource != null ? dataSource.hashCode() : 0; + result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0); + result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0); + result = 31 * result + Arrays.hashCode(metricsSpec); + return result; + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java index 529bd06aba0f..32d4bec96cb3 100644 --- a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorConfig.java @@ -39,10 +39,10 @@ public abstract class DruidCoordinatorConfig @Default("PT1800s") public abstract Duration getCoordinatorIndexingPeriod(); - @Config("druid.coordinator.merge.on") - public boolean isMergeSegments() + @Config("druid.coordinator.merge.strategy") + public String getMergeStrategy() { - return false; + return null; } @Config("druid.coordinator.conversion.on") diff --git a/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorHadoopMergeConfig.java b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorHadoopMergeConfig.java new file mode 100644 index 000000000000..b9fd0adb5e80 --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/DruidCoordinatorHadoopMergeConfig.java @@ -0,0 +1,123 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; +import java.util.Map; + +/** + */ +public class DruidCoordinatorHadoopMergeConfig +{ + + private final boolean keepGap; + private final List hadoopDependencyCoordinates; + private final Map tuningConfig; + private final List hadoopMergeSpecs; + + @JsonCreator + public DruidCoordinatorHadoopMergeConfig( + @JsonProperty("keepGap") boolean keepGap, + @JsonProperty("hadoopDependencyCoordinates") List hadoopDependencyCoordinates, + @JsonProperty("tuningConfig") Map tuningConfig, + @JsonProperty("hadoopMergeSpecs") List hadoopMergeSpecs + ) + { + this.keepGap = keepGap; + this.hadoopDependencyCoordinates = hadoopDependencyCoordinates; + this.tuningConfig = tuningConfig; + this.hadoopMergeSpecs = hadoopMergeSpecs; + } + + @JsonProperty + public boolean isKeepGap() + { + return keepGap; + } + + @JsonProperty + public List getHadoopDependencyCoordinates() + { + return hadoopDependencyCoordinates; + } + + @JsonProperty + public Map getTuningConfig() + { + return tuningConfig; + } + + @JsonProperty + public List getHadoopMergeSpecs() + { + return hadoopMergeSpecs; + } + + @Override + public String toString() + { + return "DruidCoordinatorHadoopMergeConfig{" + + "keepGap=" + keepGap + + ", hadoopDependencyCoordinates=" + hadoopDependencyCoordinates + + ", tuningConfig=" + tuningConfig + + ", hadoopMergeSpecs=" + hadoopMergeSpecs + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DruidCoordinatorHadoopMergeConfig that = (DruidCoordinatorHadoopMergeConfig) o; + + if (keepGap != that.keepGap) { + return false; + } + if (hadoopDependencyCoordinates != null + ? !hadoopDependencyCoordinates.equals(that.hadoopDependencyCoordinates) + : that.hadoopDependencyCoordinates != null) { + return false; + } + if (tuningConfig != null ? !tuningConfig.equals(that.tuningConfig) : that.tuningConfig != null) { + return false; + } + return hadoopMergeSpecs != null ? hadoopMergeSpecs.equals(that.hadoopMergeSpecs) : that.hadoopMergeSpecs == null; + + } + + @Override + public int hashCode() + { + int result = (keepGap ? 1 : 0); + result = 31 * result + (hadoopDependencyCoordinates != null ? hadoopDependencyCoordinates.hashCode() : 0); + result = 31 * result + (tuningConfig != null ? tuningConfig.hashCode() : 0); + result = 31 * result + (hadoopMergeSpecs != null ? hadoopMergeSpecs.hashCode() : 0); + return result; + } +} diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorHadoopSegmentMerger.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorHadoopSegmentMerger.java new file mode 100644 index 000000000000..8fef736c548b --- /dev/null +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorHadoopSegmentMerger.java @@ -0,0 +1,274 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator.helper; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import com.google.inject.Inject; +import com.metamx.common.logger.Logger; +import com.metamx.emitter.service.ServiceMetricEvent; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.common.config.JacksonConfigManager; +import io.druid.server.coordinator.CoordinatorHadoopMergeSpec; +import io.druid.server.coordinator.CoordinatorStats; +import io.druid.server.coordinator.DatasourceWhitelist; +import io.druid.server.coordinator.DruidCoordinatorHadoopMergeConfig; +import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import io.druid.timeline.DataSegment; +import io.druid.timeline.TimelineObjectHolder; +import io.druid.timeline.VersionedIntervalTimeline; +import org.joda.time.DateTime; +import org.joda.time.Interval; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +public class DruidCoordinatorHadoopSegmentMerger implements DruidCoordinatorHelper +{ + + private static final Logger log = new Logger(DruidCoordinatorHadoopSegmentMerger.class); + + public static final String HADOOP_REINDEX_TASK_ID_PREFIX = "coordinator_hadoop_reindex"; + + private final IndexingServiceClient indexingServiceClient; + private final AtomicReference whiteListRef; + + private boolean scanFromOldToNew; + + @Inject + public DruidCoordinatorHadoopSegmentMerger( + IndexingServiceClient indexingServiceClient, + JacksonConfigManager configManager + ) + { + this.indexingServiceClient = indexingServiceClient; + this.whiteListRef = configManager.watch(DatasourceWhitelist.CONFIG_KEY, DatasourceWhitelist.class); + this.scanFromOldToNew = true; + } + + @VisibleForTesting + DruidCoordinatorHadoopSegmentMerger( + IndexingServiceClient indexingServiceClient, + JacksonConfigManager configManager, + boolean scanFromOldToNew + ) + { + this.indexingServiceClient = indexingServiceClient; + this.whiteListRef = configManager.watch(DatasourceWhitelist.CONFIG_KEY, DatasourceWhitelist.class); + this.scanFromOldToNew = scanFromOldToNew; + } + + @Override + public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) + { + final DruidCoordinatorHadoopMergeConfig hadoopMergeConfig = params.getCoordinatorDynamicConfig() + .getHadoopMergeConfig(); + if (hadoopMergeConfig == null + || hadoopMergeConfig.getHadoopMergeSpecs() == null + || hadoopMergeConfig.getHadoopMergeSpecs().isEmpty()) { + log.info("No HadoopMergeConfig was found, skipping Hadoop segment merging"); + return params; + } + + final Map hadoopMergeSpecs = new HashMap<>(); + for (CoordinatorHadoopMergeSpec spec : hadoopMergeConfig.getHadoopMergeSpecs()) { + hadoopMergeSpecs.put(spec.getDataSource(), spec); + } + + final Map tuningConfig = hadoopMergeConfig.getTuningConfig(); + final List hadoopDependencies = hadoopMergeConfig.getHadoopDependencyCoordinates(); + final boolean keepSegmentGapDuringMerge = hadoopMergeConfig.isKeepGap(); + final DatasourceWhitelist whitelist = whiteListRef.get(); + final long segmentSizeThreshold = params.getCoordinatorDynamicConfig().getMergeBytesLimit(); + + final CoordinatorStats stats = new CoordinatorStats(); + final Map> dataSources = Maps.newHashMap(); + + // Find serviced segments by using a timeline + for (DataSegment dataSegment : params.getAvailableSegments()) { + if (whitelist == null || whitelist.contains(dataSegment.getDataSource())) { + VersionedIntervalTimeline timeline = dataSources.get(dataSegment.getDataSource()); + if (timeline == null) { + timeline = new VersionedIntervalTimeline(Ordering.natural()); + dataSources.put(dataSegment.getDataSource(), timeline); + } + timeline.add( + dataSegment.getInterval(), + dataSegment.getVersion(), + dataSegment.getShardSpec().createChunk(dataSegment) + ); + } + } + + for (final Map.Entry> entry : dataSources.entrySet()) { + final String dataSource = entry.getKey(); + if (!isPreviousTaskFinished(dataSource)) { + continue; + } + + final CoordinatorHadoopMergeSpec mergeSpec = hadoopMergeSpecs.get(dataSource); + if (mergeSpec == null) { + log.info("Didn't find CoordinatorHadoopMergeSpec for dataSource [%s], skip merging", dataSource); + continue; + } + + log.info( + "Finding imbalanced segments for datasource [%s], scanning from [%s]", + dataSource, + scanFromOldToNew ? "old to new" : "new to old" + ); + + final VersionedIntervalTimeline timeline = entry.getValue(); + final List> timelineObjects = timeline.lookup( + new Interval(new DateTime(0), new DateTime("3000-01-01")) + ); + final List unbalancedIntervals = new ArrayList(); + + long currTotalSize = 0; + Interval intervalToReindex = null; + boolean shouldBeMerged = false; + + final Iterator> listIterator = scanFromOldToNew + ? timelineObjects.iterator() + : Lists.reverse(timelineObjects) + .iterator(); + + while (listIterator.hasNext()) { + TimelineObjectHolder objectHolder = listIterator.next(); + final Interval currInterval = objectHolder.getInterval(); + if (intervalToReindex == null) { + intervalToReindex = currInterval; + } else { + if (currInterval.abuts(intervalToReindex)) { + intervalToReindex = expandInterval(intervalToReindex, currInterval); + } else if (keepSegmentGapDuringMerge) { + intervalToReindex = currInterval; + currTotalSize = 0; + shouldBeMerged = false; + } else { + intervalToReindex = expandInterval(intervalToReindex, currInterval); + } + } + + for (DataSegment segment : objectHolder.getObject().payloads()) { + if (segment.getSize() < segmentSizeThreshold) { + shouldBeMerged = true; + } + currTotalSize += segment.getSize(); + log.debug( + "After adding segment [%s], currTotalSize [%d], target [%d]", + segment.getIdentifier(), + currTotalSize, + segmentSizeThreshold + ); + } + + if (currTotalSize >= segmentSizeThreshold) { + if (shouldBeMerged) { + log.info("Adding unbalanced interval [%s]", intervalToReindex); + unbalancedIntervals.add(intervalToReindex); + } + currTotalSize = 0; + intervalToReindex = null; + shouldBeMerged = false; + } + + } + + if (!unbalancedIntervals.isEmpty()) { + submitHadoopReindexTask(dataSource, unbalancedIntervals, stats, mergeSpec, tuningConfig, hadoopDependencies); + } + } + + // invert the scan direction if "keepGap" is set + if (keepSegmentGapDuringMerge) { + scanFromOldToNew = !scanFromOldToNew; + } + + log.info("Issued merge requests for [%s] dataSource", stats.getGlobalStats().get("hadoopMergeCount").get()); + + params.getEmitter().emit( + new ServiceMetricEvent.Builder().build( + "coordinator/hadoopMerge/count", stats.getGlobalStats().get("hadoopMergeCount") + ) + ); + return params.buildFromExisting().withCoordinatorStats(stats).build(); + } + + private Interval expandInterval(Interval intervalToReindex, Interval currInterval) + { + return scanFromOldToNew + ? intervalToReindex.withEnd(currInterval.getEnd()) + : intervalToReindex.withStart(currInterval.getStart()); + } + + private void submitHadoopReindexTask( + String dataSource, + List intervalsToReindex, + CoordinatorStats stats, + CoordinatorHadoopMergeSpec mergeSpec, + Map tuningConfig, + List hadoopDependencies + ) + { + final String taskId = indexingServiceClient.hadoopMergeSegments( + dataSource, + intervalsToReindex, + mergeSpec.getMetricsSpec(), + mergeSpec.getQueryGranularity(), + mergeSpec.getDimensions(), + tuningConfig, + hadoopDependencies + ); + log.info( + "Submitted Hadoop Reindex Task for dataSource [%s] at intervals [%s]. TaskID is [%s]", + dataSource, + intervalsToReindex, + taskId + ); + stats.addToGlobalStat("hadoopMergeCount", 1); + } + + private boolean isPreviousTaskFinished(String dataSource) + { + final List> incompleteTasks = indexingServiceClient.getIncompleteTasks(); + for (Map task : incompleteTasks) { + final String taskId = (String) task.get("id"); + if (taskId != null && taskId.startsWith(HADOOP_REINDEX_TASK_ID_PREFIX + "_" + dataSource)) { + log.info( + "An existing Hadoop Reindex Task [%s] for dataSource [%s] is still running, skipping...", + taskId, + dataSource + ); + return false; + } + } + return true; + } + +} diff --git a/server/src/main/java/io/druid/server/http/CoordinatorDynamicConfigsResource.java b/server/src/main/java/io/druid/server/http/CoordinatorDynamicConfigsResource.java index 0d955b915bf2..588f2aaf4b74 100644 --- a/server/src/main/java/io/druid/server/http/CoordinatorDynamicConfigsResource.java +++ b/server/src/main/java/io/druid/server/http/CoordinatorDynamicConfigsResource.java @@ -75,20 +75,18 @@ public Response getDynamicConfigs() // default value is used for backwards compatibility @POST @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) public Response setDynamicConfigs(final CoordinatorDynamicConfig dynamicConfig, @HeaderParam(AuditManager.X_DRUID_AUTHOR) @DefaultValue("") final String author, @HeaderParam(AuditManager.X_DRUID_COMMENT) @DefaultValue("") final String comment, @Context HttpServletRequest req ) { - if (!manager.set( + return manager.set( CoordinatorDynamicConfig.CONFIG_KEY, dynamicConfig, new AuditInfo(author, comment, req.getRemoteAddr()) - )) { - return Response.status(Response.Status.BAD_REQUEST).build(); - } - return Response.ok().build(); + ) ? Response.ok().build() : Response.status(Response.Status.INTERNAL_SERVER_ERROR).build(); } @GET @@ -108,8 +106,7 @@ public Response getDatasourceRuleHistory( CoordinatorDynamicConfig.CONFIG_KEY, count ) - ) - .build(); + ).build(); } catch (IllegalArgumentException e) { return Response.status(Response.Status.BAD_REQUEST) @@ -122,9 +119,7 @@ public Response getDatasourceRuleHistory( CoordinatorDynamicConfig.CONFIG_KEY, CoordinatorDynamicConfig.CONFIG_KEY, theInterval - ) - ) - .build(); + )).build(); } } diff --git a/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java b/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java index 3335eb820add..6098a63c0263 100644 --- a/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java +++ b/server/src/main/java/io/druid/server/initialization/ZkPathsConfig.java @@ -25,29 +25,28 @@ public class ZkPathsConfig { @JsonProperty - private - String base = "druid"; + private String base = "druid"; + @JsonProperty - private - String propertiesPath; + private String propertiesPath; + @JsonProperty - private - String announcementsPath; + private String announcementsPath; + @JsonProperty @Deprecated - private - String servedSegmentsPath; + private String servedSegmentsPath; + @JsonProperty - private - String liveSegmentsPath; + private String liveSegmentsPath; + @JsonProperty - private - String coordinatorPath; + private String coordinatorPath; + @JsonProperty - private - String loadQueuePath; + private String loadQueuePath; + @JsonProperty - private - String connectorPath; + private String connectorPath; public String getBase() { diff --git a/server/src/test/java/io/druid/client/indexing/ClientAppendQueryTest.java b/server/src/test/java/io/druid/client/indexing/ClientAppendTaskTest.java similarity index 75% rename from server/src/test/java/io/druid/client/indexing/ClientAppendQueryTest.java rename to server/src/test/java/io/druid/client/indexing/ClientAppendTaskTest.java index c470cfeffe4a..5d6b413ec4fb 100644 --- a/server/src/test/java/io/druid/client/indexing/ClientAppendQueryTest.java +++ b/server/src/test/java/io/druid/client/indexing/ClientAppendTaskTest.java @@ -29,9 +29,9 @@ import java.util.List; -public class ClientAppendQueryTest +public class ClientAppendTaskTest { - private ClientAppendQuery clientAppendQuery; + private ClientAppendTask clientAppendTask; private static final String DATA_SOURCE = "data_source"; private List segments = Lists.newArrayList( new DataSegment(DATA_SOURCE, new Interval(new DateTime(), new DateTime().plus(1)), new DateTime().toString(), null, @@ -39,31 +39,31 @@ public class ClientAppendQueryTest @Before public void setUp() { - clientAppendQuery = new ClientAppendQuery(DATA_SOURCE, segments); + clientAppendTask = new ClientAppendTask(DATA_SOURCE, segments); } @Test public void testGetType() { - Assert.assertEquals("append",clientAppendQuery.getType()); + Assert.assertEquals("append", clientAppendTask.getType()); } @Test public void testGetDataSource() { - Assert.assertEquals(DATA_SOURCE, clientAppendQuery.getDataSource()); + Assert.assertEquals(DATA_SOURCE, clientAppendTask.getDataSource()); } @Test public void testGetSegments() { - Assert.assertEquals(segments, clientAppendQuery.getSegments()); + Assert.assertEquals(segments, clientAppendTask.getSegments()); } @Test public void testToString() { - Assert.assertTrue(clientAppendQuery.toString().contains(DATA_SOURCE)); - Assert.assertTrue(clientAppendQuery.toString().contains(segments.toString())); + Assert.assertTrue(clientAppendTask.toString().contains(DATA_SOURCE)); + Assert.assertTrue(clientAppendTask.toString().contains(segments.toString())); } } diff --git a/server/src/test/java/io/druid/client/indexing/ClientConversionQueryTest.java b/server/src/test/java/io/druid/client/indexing/ClientConversionTaskTest.java similarity index 64% rename from server/src/test/java/io/druid/client/indexing/ClientConversionQueryTest.java rename to server/src/test/java/io/druid/client/indexing/ClientConversionTaskTest.java index c638e2d0fdd5..932aaff131b8 100644 --- a/server/src/test/java/io/druid/client/indexing/ClientConversionQueryTest.java +++ b/server/src/test/java/io/druid/client/indexing/ClientConversionTaskTest.java @@ -25,9 +25,9 @@ import org.junit.Assert; import org.junit.Test; -public class ClientConversionQueryTest +public class ClientConversionTaskTest { - private ClientConversionQuery clientConversionQuery; + private ClientConversionTask clientConversionTask; private static final String DATA_SOURCE = "data_source"; private static final Interval INTERVAL = new Interval(new DateTime(), new DateTime().plus(1)); private static final DataSegment DATA_SEGMENT = new DataSegment(DATA_SOURCE, INTERVAL, new DateTime().toString(), null, @@ -36,31 +36,31 @@ public class ClientConversionQueryTest @Test public void testGetType() { - clientConversionQuery = new ClientConversionQuery(DATA_SEGMENT); - Assert.assertEquals("version_converter", clientConversionQuery.getType()); + clientConversionTask = new ClientConversionTask(DATA_SEGMENT); + Assert.assertEquals("version_converter", clientConversionTask.getType()); } @Test public void testGetDataSource() { - clientConversionQuery = new ClientConversionQuery(DATA_SEGMENT); - Assert.assertEquals(DATA_SOURCE, clientConversionQuery.getDataSource()); + clientConversionTask = new ClientConversionTask(DATA_SEGMENT); + Assert.assertEquals(DATA_SOURCE, clientConversionTask.getDataSource()); } @Test public void testGetInterval() { - clientConversionQuery = new ClientConversionQuery(DATA_SEGMENT); - Assert.assertEquals(INTERVAL, clientConversionQuery.getInterval()); + clientConversionTask = new ClientConversionTask(DATA_SEGMENT); + Assert.assertEquals(INTERVAL, clientConversionTask.getInterval()); } @Test public void testGetSegment() { - clientConversionQuery = new ClientConversionQuery(DATA_SEGMENT); - Assert.assertEquals(DATA_SEGMENT, clientConversionQuery.getSegment()); - clientConversionQuery = new ClientConversionQuery(DATA_SOURCE,INTERVAL); - Assert.assertNull(clientConversionQuery.getSegment()); + clientConversionTask = new ClientConversionTask(DATA_SEGMENT); + Assert.assertEquals(DATA_SEGMENT, clientConversionTask.getSegment()); + clientConversionTask = new ClientConversionTask(DATA_SOURCE, INTERVAL); + Assert.assertNull(clientConversionTask.getSegment()); } } diff --git a/server/src/test/java/io/druid/client/indexing/ClientHadoopIndexTaskTest.java b/server/src/test/java/io/druid/client/indexing/ClientHadoopIndexTaskTest.java new file mode 100644 index 000000000000..54eae48b5251 --- /dev/null +++ b/server/src/test/java/io/druid/client/indexing/ClientHadoopIndexTaskTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client.indexing; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.ArbitraryGranularitySpec; +import io.druid.segment.indexing.granularity.GranularitySpec; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + */ +public class ClientHadoopIndexTaskTest +{ + private static final String TASK_ID = "coordinator_index_task_1234"; + private static final String DATA_SOURCE = "wikipedia"; + + private static final List INTERVAL_LIST = ImmutableList.of( + new Interval("2012-01-01/2012-01-03"), + new Interval("2012-01-05/2012-01-08"), + new Interval("2012-01-10/2012-01-14") + ); + + private static final List DIMENSIONS = ImmutableList.of("a", "b", "c"); + + private static final AggregatorFactory[] AGGREGATORS = new AggregatorFactory[]{ + new CountAggregatorFactory("name"), + new DoubleSumAggregatorFactory( + "added", + "added" + ), + new DoubleSumAggregatorFactory( + "deleted", + "deleted" + ), + new DoubleSumAggregatorFactory( + "delta", + "delta" + ) + }; + + private static final Map TUNING_CONFIG = ImmutableMap.builder() + .put("type", "hadoop") + .put( + "partitionSpec", + ImmutableMap.of( + "assumeGrouped", + true, + "targetPartitionSize", + 1000, + "type", + "hashed" + ) + ) + .put("rowFlushBoundary", 10000) + .build(); + + @Test + public void testInit() throws IOException + { + final ClientHadoopIndexTask clientHadoopIndexTask = new ClientHadoopIndexTask( + TASK_ID, + DATA_SOURCE, + INTERVAL_LIST, + AGGREGATORS, + DIMENSIONS, + QueryGranularity.DAY, + TUNING_CONFIG, + null, + new DefaultObjectMapper() + ); + + final ClientHadoopIngestionSpec hadoopIngestionSpec = clientHadoopIndexTask.getHadoopIngestionSpec(); + final DataSchema dataSchema = hadoopIngestionSpec.getDataSchema(); + final GranularitySpec granularitySpec = dataSchema.getGranularitySpec(); + final ClientHadoopIOConfig hadoopIOConfig = hadoopIngestionSpec.getIOConfig(); + final Map tuningConfig = hadoopIngestionSpec.getTuningConfig(); + + Assert.assertEquals("index_hadoop", clientHadoopIndexTask.getType()); + Assert.assertEquals(TASK_ID, clientHadoopIndexTask.getId()); + Assert.assertEquals(DATA_SOURCE, dataSchema.getDataSource()); + Assert.assertArrayEquals(AGGREGATORS, dataSchema.getAggregators()); + Assert.assertEquals(QueryGranularity.DAY, granularitySpec.getQueryGranularity()); + Assert.assertTrue(granularitySpec instanceof ArbitraryGranularitySpec); + Assert.assertEquals(Sets.newHashSet(INTERVAL_LIST), granularitySpec.bucketIntervals().get()); + Assert.assertEquals(TUNING_CONFIG, tuningConfig); + Assert.assertEquals(ImmutableMap.of( + "type", + "dataSource", + "ingestionSpec", + ImmutableMap.of("dataSource", + DATA_SOURCE, + "intervals", + INTERVAL_LIST, + "dimensions", + DIMENSIONS + ) + ), hadoopIOConfig.getPathSpec()); + } + +} diff --git a/server/src/test/java/io/druid/client/indexing/ClientKillQueryTest.java b/server/src/test/java/io/druid/client/indexing/ClientKillTaskTest.java similarity index 79% rename from server/src/test/java/io/druid/client/indexing/ClientKillQueryTest.java rename to server/src/test/java/io/druid/client/indexing/ClientKillTaskTest.java index a1dc2db0aaa2..9e96675ebe4a 100644 --- a/server/src/test/java/io/druid/client/indexing/ClientKillQueryTest.java +++ b/server/src/test/java/io/druid/client/indexing/ClientKillTaskTest.java @@ -26,39 +26,39 @@ import org.junit.Before; import org.junit.Test; -public class ClientKillQueryTest +public class ClientKillTaskTest { private static final String DATA_SOURCE = "data_source"; private static final Interval INTERVAL = new Interval(new DateTime(), new DateTime().plus(1)); - ClientKillQuery clientKillQuery; + private ClientKillTask clientKillTask; @Before public void setUp() { - clientKillQuery = new ClientKillQuery(DATA_SOURCE, INTERVAL); + clientKillTask = new ClientKillTask(DATA_SOURCE, INTERVAL); } @After public void tearDown() { - clientKillQuery = null; + clientKillTask = null; } @Test public void testGetType() { - Assert.assertEquals("kill", clientKillQuery.getType()); + Assert.assertEquals("kill", clientKillTask.getType()); } @Test public void testGetDataSource() { - Assert.assertEquals(DATA_SOURCE, clientKillQuery.getDataSource()); + Assert.assertEquals(DATA_SOURCE, clientKillTask.getDataSource()); } @Test public void testGetInterval() { - Assert.assertEquals(INTERVAL, clientKillQuery.getInterval()); + Assert.assertEquals(INTERVAL, clientKillTask.getInterval()); } } diff --git a/server/src/test/java/io/druid/client/indexing/ClientMergeQueryTest.java b/server/src/test/java/io/druid/client/indexing/ClientMergeTaskTest.java similarity index 72% rename from server/src/test/java/io/druid/client/indexing/ClientMergeQueryTest.java rename to server/src/test/java/io/druid/client/indexing/ClientMergeTaskTest.java index fb6b193cb45f..239025508bfe 100644 --- a/server/src/test/java/io/druid/client/indexing/ClientMergeQueryTest.java +++ b/server/src/test/java/io/druid/client/indexing/ClientMergeTaskTest.java @@ -29,7 +29,7 @@ import java.util.List; -public class ClientMergeQueryTest +public class ClientMergeTaskTest { private static final String DATA_SOURCE = "data_source"; private static final Interval INTERVAL = new Interval(new DateTime(), new DateTime().plus(1)); @@ -37,37 +37,37 @@ public class ClientMergeQueryTest null, null, null, 0, 0); private static final List SEGMENT_LIST = Lists.newArrayList(DATA_SEGMENT); private static final List AGGREGATOR_LIST = Lists.newArrayList(); - private static final ClientMergeQuery CLIENT_MERGE_QUERY = new ClientMergeQuery(DATA_SOURCE,SEGMENT_LIST,AGGREGATOR_LIST);; + private static final ClientMergeTask CLIENT_MERGE_TASK = new ClientMergeTask(DATA_SOURCE, SEGMENT_LIST, AGGREGATOR_LIST);; @Test public void testGetType() { - Assert.assertEquals("merge", CLIENT_MERGE_QUERY.getType()); + Assert.assertEquals("merge", CLIENT_MERGE_TASK.getType()); } @Test public void testGetDataSource() { - Assert.assertEquals(DATA_SOURCE, CLIENT_MERGE_QUERY.getDataSource()); + Assert.assertEquals(DATA_SOURCE, CLIENT_MERGE_TASK.getDataSource()); } @Test public void testGetSegments() { - Assert.assertEquals(SEGMENT_LIST, CLIENT_MERGE_QUERY.getSegments()); + Assert.assertEquals(SEGMENT_LIST, CLIENT_MERGE_TASK.getSegments()); } @Test public void testGetAggregators() { - Assert.assertEquals(AGGREGATOR_LIST, CLIENT_MERGE_QUERY.getAggregators()); + Assert.assertEquals(AGGREGATOR_LIST, CLIENT_MERGE_TASK.getAggregators()); } @Test public void testToString() { - Assert.assertTrue(CLIENT_MERGE_QUERY.toString().contains(DATA_SOURCE)); - Assert.assertTrue(CLIENT_MERGE_QUERY.toString().contains(SEGMENT_LIST.toString())); - Assert.assertTrue(CLIENT_MERGE_QUERY.toString().contains(AGGREGATOR_LIST.toString())); + Assert.assertTrue(CLIENT_MERGE_TASK.toString().contains(DATA_SOURCE)); + Assert.assertTrue(CLIENT_MERGE_TASK.toString().contains(SEGMENT_LIST.toString())); + Assert.assertTrue(CLIENT_MERGE_TASK.toString().contains(AGGREGATOR_LIST.toString())); } } diff --git a/server/src/test/java/io/druid/server/coordinator/CoordinatorHadoopMergeSpecTest.java b/server/src/test/java/io/druid/server/coordinator/CoordinatorHadoopMergeSpecTest.java new file mode 100644 index 000000000000..b8bed9fbaeb1 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordinator/CoordinatorHadoopMergeSpecTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongMinAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; + +/** + */ +public class CoordinatorHadoopMergeSpecTest +{ + + @Test + public void testSerde() throws Exception + { + final ObjectMapper mapper = new DefaultObjectMapper(); + final String dataSource = "wikipedia"; + final QueryGranularity queryGranularity = QueryGranularity.ALL; + final List dimensions = ImmutableList.of("a", "b", "c"); + final AggregatorFactory[] metricsSpec = new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + new DoubleSumAggregatorFactory("added", "added"), + new LongSumAggregatorFactory("deleted", "deleted"), + new LongMinAggregatorFactory("delta", "delta") + }; + + final CoordinatorHadoopMergeSpec mergeSpec = new CoordinatorHadoopMergeSpec( + dataSource, + queryGranularity, + dimensions, + metricsSpec + ); + + Assert.assertEquals( + mergeSpec, + mapper.readValue(mapper.writeValueAsString(mergeSpec), CoordinatorHadoopMergeSpec.class) + ); + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorConfigTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorConfigTest.java index 33666d3d08de..d0ef071df879 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorConfigTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorConfigTest.java @@ -42,8 +42,8 @@ public void testDeserialization() throws Exception Assert.assertEquals(new Duration("PT300s"), config.getCoordinatorStartDelay()); Assert.assertEquals(new Duration("PT60s"), config.getCoordinatorPeriod()); Assert.assertEquals(new Duration("PT1800s"), config.getCoordinatorIndexingPeriod()); - Assert.assertFalse(config.isMergeSegments()); Assert.assertFalse(config.isConvertSegments()); + Assert.assertNull(config.getMergeStrategy()); Assert.assertFalse(config.isKillSegments()); Assert.assertEquals(86400000, config.getCoordinatorKillPeriod().getMillis()); Assert.assertEquals(-1000, config.getCoordinatorKillDurationToRetain().getMillis()); @@ -56,7 +56,7 @@ public void testDeserialization() throws Exception props.setProperty("druid.coordinator.startDelay", "PT1s"); props.setProperty("druid.coordinator.period", "PT1s"); props.setProperty("druid.coordinator.period.indexingPeriod", "PT1s"); - props.setProperty("druid.coordinator.merge.on", "true"); + props.setProperty("druid.coordinator.merge.strategy", "hadoop"); props.setProperty("druid.coordinator.conversion.on", "true"); props.setProperty("druid.coordinator.kill.on", "true"); props.setProperty("druid.coordinator.kill.period", "PT1s"); @@ -71,9 +71,9 @@ public void testDeserialization() throws Exception Assert.assertEquals(new Duration("PT1s"), config.getCoordinatorStartDelay()); Assert.assertEquals(new Duration("PT1s"), config.getCoordinatorPeriod()); Assert.assertEquals(new Duration("PT1s"), config.getCoordinatorIndexingPeriod()); - Assert.assertTrue(config.isMergeSegments()); Assert.assertTrue(config.isConvertSegments()); Assert.assertTrue(config.isKillSegments()); + Assert.assertEquals("hadoop", config.getMergeStrategy()); Assert.assertEquals(new Duration("PT1s"), config.getCoordinatorKillPeriod()); Assert.assertEquals(new Duration("PT1s"), config.getCoordinatorKillDurationToRetain()); Assert.assertEquals(10000, config.getCoordinatorKillMaxSegments()); diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorHadoopMergeConfigTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorHadoopMergeConfigTest.java new file mode 100644 index 000000000000..b3de5b44b0c4 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorHadoopMergeConfigTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.DoubleMaxAggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; +import io.druid.query.aggregation.LongMinAggregationTest; +import io.druid.query.aggregation.LongMinAggregatorFactory; +import io.druid.query.aggregation.LongSumAggregatorFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.Map; + + +/** + */ +public class DruidCoordinatorHadoopMergeConfigTest +{ + + private static final List HADOOP_COORDINATES = ImmutableList.of("org.apache.hadoop:hadoop-client:2.3.0"); + private static final Map TUNING_CONFIG = ImmutableMap.builder() + .put("type", "hadoop") + .put( + "partitionSpec", + ImmutableMap.of( + "assumeGrouped", + true, + "targetPartitionSize", + 1000, + "type", + "hashed" + ) + ) + .put("rowFlushBoundary", 10000) + .build(); + + @Test + public void testSerde() throws Exception + { + final ObjectMapper mapper = new DefaultObjectMapper(); + final DruidCoordinatorHadoopMergeConfig config = new DruidCoordinatorHadoopMergeConfig( + true, + HADOOP_COORDINATES, + TUNING_CONFIG, + ImmutableList.of( + new CoordinatorHadoopMergeSpec( + "datasource1", + QueryGranularity.DAY, + ImmutableList.of("d1", "d2", "d3"), + new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + new DoubleSumAggregatorFactory("added", "added"), + new LongSumAggregatorFactory("deleted", "deleted"), + new LongMinAggregatorFactory("delta", "delta") + } + ), + new CoordinatorHadoopMergeSpec( + "datasource2", + QueryGranularity.HOUR, + ImmutableList.of("d4", "d5", "d6"), + new AggregatorFactory[] { + new CountAggregatorFactory("count2"), + new DoubleMaxAggregatorFactory("added2", "added2"), + new LongSumAggregatorFactory("deleted2", "deleted2"), + new LongMinAggregatorFactory("delta2", "delta2") + } + ) + ) + ); + + Assert.assertEquals( + config, + mapper.readValue(mapper.writeValueAsString(config), DruidCoordinatorHadoopMergeConfig.class) + ); + } +} diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java index cbc59e9fda0e..b0f1d95580d2 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorRuleRunnerTest.java @@ -499,7 +499,7 @@ public void testDropRemove() throws Exception EasyMock.expect(coordinator.getDynamicConfigs()).andReturn( new CoordinatorDynamicConfig( - 0, 0, 0, 0, 1, 24, 0, false, null + 0, 0, 0, 0, 1, 24, 0, false, null, null ) ).anyTimes(); coordinator.removeSegment(EasyMock.anyObject()); @@ -1006,7 +1006,7 @@ public void testReplicantThrottleAcrossTiers() throws Exception { EasyMock.expect(coordinator.getDynamicConfigs()).andReturn( new CoordinatorDynamicConfig( - 0, 0, 0, 0, 1, 7, 0, false, null + 0, 0, 0, 0, 1, 7, 0, false, null, null ) ).atLeastOnce(); coordinator.removeSegment(EasyMock.anyObject()); @@ -1182,7 +1182,7 @@ private void mockCoordinator() { EasyMock.expect(coordinator.getDynamicConfigs()).andReturn( new CoordinatorDynamicConfig( - 0, 0, 0, 0, 1, 24, 0, false, null + 0, 0, 0, 0, 1, 24, 0, false, null, null ) ).anyTimes(); coordinator.removeSegment(EasyMock.anyObject()); diff --git a/server/src/test/java/io/druid/server/coordinator/TestDruidCoordinatorConfig.java b/server/src/test/java/io/druid/server/coordinator/TestDruidCoordinatorConfig.java index 112da1440a21..4be9db4e6821 100644 --- a/server/src/test/java/io/druid/server/coordinator/TestDruidCoordinatorConfig.java +++ b/server/src/test/java/io/druid/server/coordinator/TestDruidCoordinatorConfig.java @@ -33,10 +33,14 @@ public class TestDruidCoordinatorConfig extends DruidCoordinatorConfig private final int coordinatorKillMaxSegments; private final String consoleStatic; - private final boolean mergeSegments; private final boolean convertSegments; + public TestDruidCoordinatorConfig() + { + this(null, null, null, null, null, null, 0, null, false, false); + } + public TestDruidCoordinatorConfig( Duration coordinatorStartDelay, Duration coordinatorPeriod, @@ -80,12 +84,6 @@ public Duration getCoordinatorIndexingPeriod() return coordinatorIndexingPeriod; } - @Override - public boolean isMergeSegments() - { - return mergeSegments; - } - @Override public boolean isConvertSegments() { diff --git a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorHadoopSegmentMergerTest.java b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorHadoopSegmentMergerTest.java new file mode 100644 index 000000000000..6b2526539fe8 --- /dev/null +++ b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorHadoopSegmentMergerTest.java @@ -0,0 +1,1035 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.coordinator.helper; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.metamx.common.Pair; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.client.indexing.IndexingServiceClient; +import io.druid.common.config.JacksonConfigManager; +import io.druid.granularity.QueryGranularity; +import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.server.coordinator.CoordinatorDynamicConfig; +import io.druid.server.coordinator.CoordinatorHadoopMergeSpec; +import io.druid.server.coordinator.DatasourceWhitelist; +import io.druid.server.coordinator.DruidCoordinatorHadoopMergeConfig; +import io.druid.server.coordinator.DruidCoordinatorRuntimeParams; +import io.druid.timeline.DataSegment; +import io.druid.timeline.partition.HashBasedNumberedShardSpec; +import io.druid.timeline.partition.LinearShardSpec; +import io.druid.timeline.partition.NumberedShardSpec; +import io.druid.timeline.partition.SingleDimensionShardSpec; +import org.easymock.EasyMock; +import org.joda.time.Interval; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +/** + */ +public class DruidCoordinatorHadoopSegmentMergerTest +{ + private static final long mergeBytesLimit = 100; + + @Test + public void testNoMerges() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(100).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P1D")).version("2").size(100).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("2").size(100).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("2").size(100).build() + ); + + Assert.assertEquals( + ImmutableList.of(), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of(), + merge(segments, false, false) + ); + } + + @Test + public void testMergeAtStart() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(20).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P1D")).version("2").size(80).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("2").size(100).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("2").size(100).build() + ); + + Assert.assertEquals( + ImmutableList.of(Pair.of("foo", ImmutableList.of(new Interval("2012-01-01/P2D")))), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of(Pair.of("foo", ImmutableList.of(new Interval("2012-01-01/P2D")))), + merge(segments, false, false) + ); + } + + @Test + public void testMergeAtEnd() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(100).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P1D")).version("2").size(100).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("2").size(80).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("2").size(20).build() + ); + + Assert.assertEquals( + ImmutableList.of(Pair.of("foo", ImmutableList.of(new Interval("2012-01-03/P2D")))), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of(Pair.of("foo", ImmutableList.of(new Interval("2012-01-03/P2D")))), + merge(segments, false, false) + ); + } + + @Test + public void testMergeInMiddle() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(100).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P1D")).version("2").size(80).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("2").size(20).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("2").size(100).build() + ); + + Assert.assertEquals( + ImmutableList.of(Pair.of("foo", ImmutableList.of(new Interval("2012-01-02/P2D")))), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of(Pair.of("foo", ImmutableList.of(new Interval("2012-01-02/P2D")))), + merge(segments, false, false) + ); + } + + @Test + public void testMergeNoncontiguous() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(10).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("2").size(30).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("2").size(70).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-05/P1D")).version("2").size(30).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-07/P1D")).version("2").size(150).build() + ); + + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of( + new Interval("2012-01-01/2012-01-05"), + new Interval("2012-01-05/2012-01-08") + ) + ) + ), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of(Pair.of("foo", ImmutableList.of(new Interval("2012-01-04/2012-01-06")))), + merge(segments, false, false) + ); + } + + @Test + public void testMergeNoncontiguousWithKeepGap() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(10).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("2").size(20).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("2").size(70).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-05/P1D")).version("2").size(30).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-07/P1D")).version("2").size(150).build() + ); + + Assert.assertEquals( + ImmutableList.of(Pair.of("foo", ImmutableList.of(new Interval("2012-01-03/2012-01-06")))), + merge(segments, true, true) + ); + Assert.assertEquals( + ImmutableList.of(Pair.of("foo", ImmutableList.of(new Interval("2012-01-04/2012-01-06")))), + merge(segments, true, false) + ); + } + + @Test + public void testMergeSeries() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(50).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P1D")).version("2").size(50).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("2").size(50).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("2").size(50).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-05/P1D")).version("2").size(50).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-06/P1D")).version("2").size(50).build() + ); + + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of( + new Interval("2012-01-01/P2D"), + new Interval("2012-01-03/P2D"), + new Interval("2012-01-05/P2d") + ) + ) + ), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of( + new Interval("2012-01-05/P2d"), + new Interval("2012-01-03/P2D"), + new Interval("2012-01-01/P2D") + ) + ) + ), + merge(segments, false, false) + ); + } + + @Test + public void testOverlappingMerge1() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(20).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P1D")).version("2").size(50).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P4D")).version("2").size(30).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("3").size(20).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-05/P1D")).version("4").size(20).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-06/P1D")).version("3").size(20).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-07/P1D")).version("2").size(40).build() + ); + + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of( + new Interval("2012-01-01/2012-01-04"), + new Interval("2012-01-04/2012-01-08") + ) + ) + ), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-04/P4D"), new Interval("2012-01-01/P3D")) + ) + ), merge(segments, false, false) + ); + } + + @Test + public void testOverlappingMerge2() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P8D")).version("2").size(80).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("3").size(8).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("3").size(8).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-09/P1D")).version("3").size(8).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-10/P1D")).version("3").size(8).build() + ); + + Assert.assertEquals( + ImmutableList.of(Pair.of("foo", ImmutableList.of(new Interval("2012-01-01/2012-01-09")))), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of(Pair.of("foo", ImmutableList.of(new Interval("2012-01-04/2012-01-11")))), + merge(segments, false, false) + ); + } + + @Test + public void testOverlappingMerge3() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P8D")).version("2").size(80).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P1D")).version("3").size(8).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("3").size(8).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-09/P1D")).version("3").size(8).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-10/P1D")).version("3").size(8).build() + ); + + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-01/2012-01-04"), new Interval("2012-01-04/2012-01-11")) + ) + ), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-04/2012-01-11"), new Interval("2012-01-01/2012-01-04")) + ) + ), + merge(segments, false, false) + ); + } + + @Test + public void testOverlappingMerge4() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(80).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P4D")).version("2").size(80).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("3").size(25).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("1").size(25).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-05/P1D")).version("3").size(25).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-06/P1D")).version("2").size(80).build() + ); + + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of( + new Interval("2012-01-01/2012-01-03"), + new Interval("2012-01-03/2012-01-05"), + new Interval("2012-01-05/2012-01-07") + ) + ) + ), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of( + new Interval("2012-01-05/2012-01-07"), + new Interval("2012-01-03/2012-01-05"), + new Interval("2012-01-01/2012-01-03") + ) + ) + ), + merge(segments, false, false) + ); + } + + @Test + public void testOverlappingMerge5() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(15).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P4D")).version("2").size(80).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("3").size(25).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("4").size(25).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-05/P1D")).version("3").size(25).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-06/P1D")).version("2").size(80).build() + ); + + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-01/2012-01-04"), new Interval("2012-01-04/2012-01-07")) + ) + ), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-05/2012-01-07"), new Interval("2012-01-02/2012-01-05")) + ) + ), + merge(segments, false, false) + ); + } + + @Test + public void testOverlappingMerge6() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(80).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P4D")).version("2").size(80).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("3").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("1").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-05/P1D")).version("3").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-06/P1D")).version("2").size(80).build() + ); + + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-01/2012-01-03"), new Interval("2012-01-03/2012-01-07")) + ) + ), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-04/2012-01-07"), new Interval("2012-01-01/2012-01-04")) + ) + ), + merge(segments, false, false) + ); + } + + @Test + public void testOverlappingMerge7() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(80).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P4D")).version("2").size(80).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("3").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("4").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-05/P1D")).version("3").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-06/P1D")).version("2").size(97).build() + ); + + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-01/2012-01-03"), new Interval("2012-01-03/2012-01-07")) + ) + ), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-03/2012-01-07"), new Interval("2012-01-01/2012-01-03")) + ) + ), + merge(segments, false, false) + ); + } + + @Test + public void testOverlappingMerge8() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P4D")).version("2").size(80).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("3").size(25).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("1").size(25).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-05/P1D")).version("3").size(25).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-06/P1D")).version("2").size(80).build() + ); + + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-01/2012-01-04"), new Interval("2012-01-04/2012-01-06")) + ) + ), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-05/2012-01-07"), new Interval("2012-01-03/2012-01-05")) + ) + ), + merge(segments, false, false) + ); + } + + @Test + public void testOverlappingMerge9() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P4D")).version("2").size(80).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("3").size(25).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("4").size(25).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-05/P1D")).version("3").size(25).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-06/P1D")).version("2").size(80).build() + ); + + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-01/2012-01-04"), new Interval("2012-01-04/2012-01-07")) + ) + ), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-05/2012-01-07"), new Interval("2012-01-02/2012-01-05")) + ) + ), + merge(segments, false, false) + ); + } + + @Test + public void testOverlappingMerge10() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(80).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P4D")).version("2").size(120).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("3").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("4").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-05/P1D")).version("3").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-06/P1D")).version("2").size(80).build() + ); + + Assert.assertEquals( + ImmutableList.of(Pair.of("foo", ImmutableList.of(new Interval("2012-01-01/2012-01-03")))), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of(Pair.of("foo", ImmutableList.of(new Interval("2012-01-02/2012-01-07")))), + merge(segments, false, false) + ); + } + + @Test + public void testOverlappingMerge11() + { + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(80).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P4D")).version("2").size(120).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-03/P1D")).version("3").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P1D")).version("1").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-05/P1D")).version("3").size(1).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-06/P1D")).version("2").size(80).build() + ); + + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-01/2012-01-03"), new Interval("2012-01-03/2012-01-05")) + ) + ), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-04/2012-01-07"), new Interval("2012-01-02/2012-01-04")) + ) + ), + merge(segments, false, false) + ); + } + + @Test + public void testMergeLinearShardSpecs() + { + final List segments = ImmutableList.of( + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-01/P1D")) + .version("1") + .shardSpec(new LinearShardSpec(1)) + .size(30) + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-01/P1D")) + .version("1") + .shardSpec(new LinearShardSpec(2)) + .size(20) + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-01/P1D")) + .version("1") + .shardSpec(new LinearShardSpec(20)) + .size(200) + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-02/P1D")) + .version("1") + .shardSpec(new LinearShardSpec(7)) + .size(40) + .build(), + DataSegment.builder().dataSource("foo") + .interval(new Interval("2012-01-03/P1D")) + .version("1") + .shardSpec(new LinearShardSpec(1)) + .size(30) + .build(), + DataSegment.builder().dataSource("foo") + .interval(new Interval("2012-01-03/P1D")) + .version("1") + .shardSpec(new LinearShardSpec(1500)) + .size(30) + .build() + ); + + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of( + new Interval("2012-01-01/2012-01-02"), + new Interval("2012-01-02/2012-01-04") + ) + ) + ), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of( + new Interval("2012-01-02/2012-01-04"), + new Interval("2012-01-01/2012-01-02") + ) + ) + ), + merge(segments, false, false) + ); + } + + @Test + public void testMergeIncompleteNumberedShardSpecs() + { + final List segments = ImmutableList.of( + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-01/P1D")) + .version("1") + .shardSpec(new NumberedShardSpec(0, 2)) + .size(30) + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-02/P1D")) + .version("1") + .shardSpec(new NumberedShardSpec(1, 2)) + .size(40) + .build(), + DataSegment.builder().dataSource("foo") + .interval(new Interval("2012-01-03/P1D")) + .version("1") + .shardSpec(new NumberedShardSpec(2, 1500)) + .size(30) + .build() + ); + + Assert.assertEquals( + ImmutableList.of(), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of(), + merge(segments, false, false) + ); + } + + @Test + public void testMergeNumberedShardSpecs() + { + final List segments = ImmutableList.of( + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-01/P1D")) + .version("1") + .shardSpec(new NumberedShardSpec(0, 2)) + .size(30) + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-01/P1D")) + .version("1") + .shardSpec(new NumberedShardSpec(1, 2)) + .size(30) + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-02/P1D")) + .version("1") + .shardSpec(new NumberedShardSpec(0, 2)) + .size(20) + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-02/P1D")) + .version("1") + .shardSpec(new NumberedShardSpec(1, 2)) + .size(20) + .build() + ); + + Assert.assertEquals( + ImmutableList.of(Pair.of("foo", ImmutableList.of(new Interval("2012-01-01/2012-01-03")))), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of(Pair.of("foo", ImmutableList.of(new Interval("2012-01-01/2012-01-03")))), + merge(segments, false, false) + ); + } + + @Test + public void testMergeMixedShardSpecs() + { + final ObjectMapper mapper = new DefaultObjectMapper(); + final List segments = ImmutableList.of( + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-01/P1D")) + .version("1") + .shardSpec(new NumberedShardSpec(0, 2)) + .size(25) + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-01/P1D")) + .version("1") + .shardSpec(new NumberedShardSpec(1, 2)) + .size(25) + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-03/P1D")) + .version("1") + .size(50) + .build(), + DataSegment.builder().dataSource("foo") + .interval(new Interval("2012-01-04/P1D")) + .version("1") + .shardSpec(new LinearShardSpec(1500)) + .size(100) + .build(), + DataSegment.builder().dataSource("foo") + .interval(new Interval("2012-01-05/P1D")) + .version("1") + .shardSpec(new NumberedShardSpec(0, 1500)) + .size(1) + .build(), + DataSegment.builder().dataSource("foo") + .interval(new Interval("2012-01-06/P1D")) + .version("1") + .shardSpec(new HashBasedNumberedShardSpec(0, 2, null, mapper)) + .size(25) + .build(), + DataSegment.builder().dataSource("foo") + .interval(new Interval("2012-01-06/P1D")) + .version("1") + .shardSpec(new HashBasedNumberedShardSpec(1, 2, null, mapper)) + .size(25) + .build(), + DataSegment.builder().dataSource("foo") + .interval(new Interval("2012-01-07/P1D")) + .version("1") + .shardSpec(new SingleDimensionShardSpec("dim", null, "a", 0)) + .size(25) + .build(), + DataSegment.builder().dataSource("foo") + .interval(new Interval("2012-01-07/P1D")) + .version("1") + .shardSpec(new SingleDimensionShardSpec("dim", "a", "b", 1)) + .size(25) + .build(), + DataSegment.builder().dataSource("foo") + .interval(new Interval("2012-01-07/P1D")) + .version("1") + .shardSpec(new SingleDimensionShardSpec("dim", "b", null, 2)) + .size(25) + .build() + ); + + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-01/2012-01-04"), new Interval("2012-01-06/2012-01-08")) + ) + ), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-03/2012-01-05"), new Interval("2012-01-06/2012-01-08")) + ) + ), + merge(segments, true, true) + ); + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-06/2012-01-08"), new Interval("2012-01-01/2012-01-04")) + ) + ), + merge(segments, false, false) + ); + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of(new Interval("2012-01-06/2012-01-08")) + ) + ), + merge(segments, true, false) + ); + } + + /** + * Segment timeline + *

+ * Day: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 + * Shard0: |_O_|_O_|_O_|_O_|_S_|_S_|_S_|_O_|_S_|_O_|_S_|_O_|_S_| + * Shard1: |_S_| |_S_| |_S_|_S_| + * Shard2: |_S_| |_S_| |_S_| + */ + @Test + public void testMergeComplex() + { + final ObjectMapper mapper = new DefaultObjectMapper(); + final List segments = ImmutableList.of( + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-01/P1D")).version("2").size(120).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-02/P1D")).version("2").size(120).build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-03/P1D")) + .version("3") + .shardSpec(new NumberedShardSpec(0, 3)) + .size(500) + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-03/P1D")) + .version("3") + .shardSpec(new NumberedShardSpec(1, 3)) + .size(20) + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-03/P1D")) + .version("3") + .shardSpec(new NumberedShardSpec(2, 3)) + .size(20) + .build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-04/P5D")).version("1").size(500).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-05/P1D")).version("3").size(50).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-06/P1D")).version("3").size(50).build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-07/P1D")).version("2").size(80).build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-09/P1D")) + .version("2") + .shardSpec(new SingleDimensionShardSpec("dim", null, "a", 0)) + .size(10) + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-09/P1D")) + .version("2") + .shardSpec(new SingleDimensionShardSpec("dim", "a", "b", 1)) + .size(10) + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-09/P1D")) + .version("2") + .shardSpec(new SingleDimensionShardSpec("dim", "b", null, 2)) + .size(10) + .build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-10/P1D")).version("2").size(100).build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-11/P1D")) + .version("2") + .shardSpec(new HashBasedNumberedShardSpec(0, 3, null, mapper)) + .size(50) + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-11/P1D")) + .version("2") + .shardSpec(new HashBasedNumberedShardSpec(1, 3, null, mapper)) + .size(50) + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-11/P1D")) + .version("2") + .shardSpec(new HashBasedNumberedShardSpec(2, 3, null, mapper)) + .size(50) + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-12/P1D")) + .version("2") + .shardSpec(new NumberedShardSpec(0, 2)) + .size(100) + .build(), + DataSegment.builder() + .dataSource("foo") + .interval(new Interval("2012-01-12/P1D")) + .version("2") + .shardSpec(new NumberedShardSpec(1, 2)) + .size(1) + .build(), + DataSegment.builder().dataSource("foo").interval(new Interval("2012-01-13/P1D")).version("2").size(50).build() + ); + + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of( + new Interval("2012-01-03/2012-01-04"), + new Interval("2012-01-05/2012-01-07"), + new Interval("2012-01-07/2012-01-09"), + new Interval("2012-01-09/2012-01-11"), + new Interval("2012-01-11/2012-01-12"), + new Interval("2012-01-12/2012-01-13") + ) + ) + ), + merge(segments, false, true) + ); + Assert.assertEquals( + ImmutableList.of( + Pair.of( + "foo", + ImmutableList.of( + new Interval("2012-01-12/2012-01-14"), + new Interval("2012-01-11/2012-01-12"), + new Interval("2012-01-08/2012-01-10"), + new Interval("2012-01-06/2012-01-08"), + new Interval("2012-01-04/2012-01-06"), + new Interval("2012-01-03/2012-01-04") + ) + ) + ), + merge(segments, false, false) + ); + } + + + /** + * Runs DruidCoordinatorHadoopSegmentMerger on a particular set of segments and returns the list of unbalanced + * sections that should be reindexed. + */ + private static List>> merge( + final Collection segments, + final boolean keepSegmentGapDuringMerge, + boolean scanFromOldToNew + ) + { + final List>> retVal = Lists.newArrayList(); + final IndexingServiceClient indexingServiceClient = new IndexingServiceClient(null, null, null) + { + + @Override + public String hadoopMergeSegments( + String dataSource, + List intervalsToReindex, + AggregatorFactory[] aggregators, + QueryGranularity queryGranularity, + List dimensions, + Map tuningConfig, + List hadoopCoordinates + ) + { + retVal.add(Pair.of(dataSource, intervalsToReindex)); + return null; + } + + @Override + public List> getIncompleteTasks() + { + return ImmutableList.of(); + } + }; + + final JacksonConfigManager configManager = EasyMock.createMock(JacksonConfigManager.class); + EasyMock.expect(configManager.watch(DatasourceWhitelist.CONFIG_KEY, DatasourceWhitelist.class)) + .andReturn(new AtomicReference(null)).anyTimes(); + EasyMock.replay(configManager); + + final DruidCoordinatorHadoopSegmentMerger merger = new DruidCoordinatorHadoopSegmentMerger( + indexingServiceClient, + configManager, + scanFromOldToNew + ); + final DruidCoordinatorRuntimeParams params = + DruidCoordinatorRuntimeParams.newBuilder() + .withAvailableSegments(ImmutableSet.copyOf(segments)) + .withDynamicConfigs( + new CoordinatorDynamicConfig.Builder() + .withMergeBytesLimit(mergeBytesLimit) + .withhadoopMergeConfig( + new DruidCoordinatorHadoopMergeConfig( + keepSegmentGapDuringMerge, + null, + null, + ImmutableList.of( + new CoordinatorHadoopMergeSpec( + "foo", + null, + null, + null + )) + )) + .build()) + .withEmitter(EasyMock.createMock(ServiceEmitter.class)) + .build(); + merger.run(params); + return retVal; + } +} diff --git a/server/src/test/java/io/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/io/druid/server/http/CoordinatorDynamicConfigTest.java index af5bb477eccf..c58179253826 100644 --- a/server/src/test/java/io/druid/server/http/CoordinatorDynamicConfigTest.java +++ b/server/src/test/java/io/druid/server/http/CoordinatorDynamicConfigTest.java @@ -20,9 +20,16 @@ package io.druid.server.http; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import io.druid.granularity.QueryGranularity; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.query.aggregation.CountAggregatorFactory; +import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.segment.TestHelper; import io.druid.server.coordinator.CoordinatorDynamicConfig; +import io.druid.server.coordinator.CoordinatorHadoopMergeSpec; +import io.druid.server.coordinator.DruidCoordinatorHadoopMergeConfig; import org.junit.Assert; import org.junit.Test; @@ -41,8 +48,67 @@ public void testSerde() throws Exception + " \"replicantLifetime\": 1,\n" + " \"replicationThrottleLimit\": 1,\n" + " \"emitBalancingStats\": true,\n" - + " \"killDataSourceWhitelist\": [\"test\"]\n" - + "}\n"; + + " \"killDataSourceWhitelist\": [\"test\"],\n" + + " \"hadoopMergeConfig\":{\n" + + " \"keepGap\":true,\n" + + " \"hadoopDependencyCoordinates\":[\"org.apache.hadoop:hadoop-client:2.3.0\"],\n" + + " \"tuningConfig\":null,\n" + + " \"hadoopMergeSpecs\":[\n" + + " {\n" + + " \"dataSource\":\"wiki\",\n" + + " \"queryGranularity\":null,\n" + + " \"dimensions\":null,\n" + + " \"metricsSpec\":[\n" + + " {\n" + + " \"type\":\"count\",\n" + + " \"name\":\"count\"\n" + + " },\n" + + " {\n" + + " \"type\":\"doubleSum\",\n" + + " \"name\":\"added\",\n" + + " \"fieldName\":\"added\"\n" + + " },\n" + + " {\n" + + " \"type\":\"doubleSum\",\n" + + " \"name\":\"deleted\",\n" + + " \"fieldName\":\"deleted\"\n" + + " },\n" + + " {\n" + + " \"type\":\"doubleSum\",\n" + + " \"name\":\"delta\",\n" + + " \"fieldName\":\"delta\"\n" + + " }\n" + + " ]\n" + + " },\n" + + " {\n" + + " \"dataSource\":\"wikipedia\",\n" + + " \"queryGranularity\":\"DAY\",\n" + + " \"dimensions\":[\"language\"],\n" + + " \"metricsSpec\":[\n" + + " {\n" + + " \"type\":\"count\",\n" + + " \"name\":\"count\"\n" + + " },\n" + + " {\n" + + " \"type\":\"doubleSum\",\n" + + " \"name\":\"added\",\n" + + " \"fieldName\":\"added\"\n" + + " },\n" + + " {\n" + + " \"type\":\"doubleSum\",\n" + + " \"name\":\"deleted\",\n" + + " \"fieldName\":\"deleted\"\n" + + " },\n" + + " {\n" + + " \"type\":\"doubleSum\",\n" + + " \"name\":\"delta\",\n" + + " \"fieldName\":\"delta\"\n" + + " }\n" + + " ]\n" + + " }\n" + + " ]\n" + + " }\n" + + " }\n"; ObjectMapper mapper = TestHelper.getObjectMapper(); CoordinatorDynamicConfig actual = mapper.readValue( @@ -56,7 +122,46 @@ public void testSerde() throws Exception ); Assert.assertEquals( - new CoordinatorDynamicConfig(1, 1, 1, 1, 1, 1, 1, true, ImmutableSet.of("test")), + new CoordinatorDynamicConfig( + 1, + 1, + 1, + 1, + 1, + 1, + 1, + true, + ImmutableSet.of("test"), + new DruidCoordinatorHadoopMergeConfig( + true, + ImmutableList.of("org.apache.hadoop:hadoop-client:2.3.0"), + null, + ImmutableList.of( + new CoordinatorHadoopMergeSpec( + "wiki", + null, + null, + ImmutableList.of( + new CountAggregatorFactory("count"), + new DoubleSumAggregatorFactory("added", "added"), + new DoubleSumAggregatorFactory("deleted", "deleted"), + new DoubleSumAggregatorFactory("delta", "delta") + ).toArray(new AggregatorFactory[4]) + ), + new CoordinatorHadoopMergeSpec( + "wikipedia", + QueryGranularity.DAY, + ImmutableList.of("language"), + ImmutableList.of( + new CountAggregatorFactory("count"), + new DoubleSumAggregatorFactory("added", "added"), + new DoubleSumAggregatorFactory("deleted", "deleted"), + new DoubleSumAggregatorFactory("delta", "delta") + ).toArray(new AggregatorFactory[4]) + ) + ) + ) + ), actual ); } @@ -65,7 +170,7 @@ public void testSerde() throws Exception public void testBuilderDefaults() { Assert.assertEquals( - new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null), + new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null, null), new CoordinatorDynamicConfig.Builder().build() ); } @@ -73,8 +178,86 @@ public void testBuilderDefaults() @Test public void testEqualsAndHashCodeSanity() { - CoordinatorDynamicConfig config1 = new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null); - CoordinatorDynamicConfig config2 = new CoordinatorDynamicConfig(900000, 524288000, 100, 5, 15, 10, 1, false, null); + CoordinatorDynamicConfig config1 = new CoordinatorDynamicConfig( + 900000, + 524288000, + 100, + 5, + 15, + 10, + 1, + false, + null, + new DruidCoordinatorHadoopMergeConfig( + true, + ImmutableList.of("org.apache.hadoop:hadoop-client:2.3.0"), + null, + ImmutableList.of( + new CoordinatorHadoopMergeSpec( + "wiki", + null, + null, + ImmutableList.of( + new CountAggregatorFactory("count"), + new DoubleSumAggregatorFactory("added", "added"), + new DoubleSumAggregatorFactory("deleted", "deleted"), + new DoubleSumAggregatorFactory("delta", "delta") + ).toArray(new AggregatorFactory[4]) + ), + new CoordinatorHadoopMergeSpec( + "wikipedia", + QueryGranularity.DAY, + ImmutableList.of("language"), + ImmutableList.of( + new CountAggregatorFactory("count"), + new DoubleSumAggregatorFactory("added", "added"), + new DoubleSumAggregatorFactory("deleted", "deleted"), + new DoubleSumAggregatorFactory("delta", "delta") + ).toArray(new AggregatorFactory[4]) + ) + ) + ) + ); + CoordinatorDynamicConfig config2 = new CoordinatorDynamicConfig( + 900000, + 524288000, + 100, + 5, + 15, + 10, + 1, + false, + null, + new DruidCoordinatorHadoopMergeConfig( + true, + ImmutableList.of("org.apache.hadoop:hadoop-client:2.3.0"), + null, + ImmutableList.of( + new CoordinatorHadoopMergeSpec( + "wiki", + null, + null, + ImmutableList.of( + new CountAggregatorFactory("count"), + new DoubleSumAggregatorFactory("added", "added"), + new DoubleSumAggregatorFactory("deleted", "deleted"), + new DoubleSumAggregatorFactory("delta", "delta") + ).toArray(new AggregatorFactory[4]) + ), + new CoordinatorHadoopMergeSpec( + "wikipedia", + QueryGranularity.DAY, + ImmutableList.of("language"), + ImmutableList.of( + new CountAggregatorFactory("count"), + new DoubleSumAggregatorFactory("added", "added"), + new DoubleSumAggregatorFactory("deleted", "deleted"), + new DoubleSumAggregatorFactory("delta", "delta") + ).toArray(new AggregatorFactory[4]) + ) + ) + ) + ); Assert.assertEquals(config1, config2); Assert.assertEquals(config1.hashCode(), config2.hashCode()); diff --git a/services/src/main/java/io/druid/cli/CliCoordinator.java b/services/src/main/java/io/druid/cli/CliCoordinator.java index b66bd744ed48..f74418152ef0 100644 --- a/services/src/main/java/io/druid/cli/CliCoordinator.java +++ b/services/src/main/java/io/druid/cli/CliCoordinator.java @@ -53,6 +53,7 @@ import io.druid.server.coordinator.DruidCoordinator; import io.druid.server.coordinator.DruidCoordinatorConfig; import io.druid.server.coordinator.LoadQueueTaskMaster; +import io.druid.server.coordinator.helper.DruidCoordinatorHadoopSegmentMerger; import io.druid.server.coordinator.helper.DruidCoordinatorHelper; import io.druid.server.coordinator.helper.DruidCoordinatorSegmentKiller; import io.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger; @@ -178,9 +179,13 @@ public void configure(Binder binder) DruidCoordinatorHelper.class, CoordinatorIndexingServiceHelper.class ).addConditionBinding( - "druid.coordinator.merge.on", - Predicates.equalTo("true"), + "druid.coordinator.merge.strategy", + Predicates.equalTo("append"), DruidCoordinatorSegmentMerger.class + ).addConditionBinding( + "druid.coordinator.merge.strategy", + Predicates.equalTo("hadoop"), + DruidCoordinatorHadoopSegmentMerger.class ).addConditionBinding( "druid.coordinator.conversion.on", Predicates.equalTo("true"),