diff --git a/docs/api-reference/tasks-api.md b/docs/api-reference/tasks-api.md index dbd8fa7d5425..d94be5b0c5fd 100644 --- a/docs/api-reference/tasks-api.md +++ b/docs/api-reference/tasks-api.md @@ -914,13 +914,10 @@ Host: http://ROUTER_IP:ROUTER_PORT ### Get task segments :::info - This API is deprecated and will be removed in future releases. + This API is not supported anymore and always returns a 404 response. + Use the metric `segment/added/bytes` instead to identify the segment IDs committed by a task. ::: -Retrieves information about segments generated by the task given the task ID. To hit this endpoint, make sure to enable the audit log config on the Overlord with `druid.indexer.auditLog.enabled = true`. - -In addition to enabling audit logs, configure a cleanup strategy to prevent overloading the metadata store with old audit logs which may cause performance issues. To enable automated cleanup of audit logs on the Coordinator, set `druid.coordinator.kill.audit.on`. You may also manually export the audit logs to external storage. For more information, see [Audit records](../operations/clean-metadata-store.md#audit-records). - #### URL `GET` `/druid/indexer/v1/task/{taskId}/segments` @@ -929,12 +926,14 @@ In addition to enabling audit logs, configure a cleanup strategy to prevent over - + -
- -*Successfully retrieved task segments* +```json +{ + "error": "Segment IDs committed by a task action are not persisted anymore. Use the metric 'segment/added/bytes' to identify the segments created by a task." +} +```
diff --git a/docs/operations/clean-metadata-store.md b/docs/operations/clean-metadata-store.md index 49f2555c9d27..80e3494a53d1 100644 --- a/docs/operations/clean-metadata-store.md +++ b/docs/operations/clean-metadata-store.md @@ -44,7 +44,7 @@ This applies to all metadata entities in this topic except compaction configurat You can configure the retention period for each metadata type, when available, through the record's `durationToRetain` property. Certain records may require additional conditions be satisfied before clean up occurs. -See the [example](#example) for how you can customize the automated metadata cleanup for a specific use case. +See the [example](#example-configuration-for-automated-metadata-cleanup) for how you can customize the automated metadata cleanup for a specific use case. ## Automated cleanup strategies @@ -62,13 +62,12 @@ You can configure cleanup for each entity separately, as described in this secti Define the properties in the `coordinator/runtime.properties` file. The cleanup of one entity may depend on the cleanup of another entity as follows: -- You have to configure a [kill task for segment records](#kill-task) before you can configure automated cleanup for [rules](#rules-records) or [compaction configuration](#compaction-configuration-records). +- You have to configure a [kill task for segment records](#segment-records-and-segments-in-deep-storage-kill-task) before you can configure automated cleanup for [rules](#rules-records) or [compaction configuration](#compaction-configuration-records). - You have to schedule the metadata management tasks to run at the same or higher frequency as your most frequent cleanup job. For example, if your most frequent cleanup job is every hour, set the metadata store management period to one hour or less: `druid.coordinator.period.metadataStoreManagementPeriod=P1H`. For details on configuration properties, see [Metadata management](../configuration/index.md#metadata-management). -If you want to skip the details, check out the [example](#example) for configuring automated metadata cleanup. +If you want to skip the details, check out the [example](#example-configuration-for-automated-metadata-cleanup) for configuring automated metadata cleanup. - ### Segment records and segments in deep storage (kill task) :::info @@ -110,7 +109,7 @@ Supervisor cleanup uses the following configuration: ### Rules records -Rule records become eligible for deletion when all segments for the datasource have been killed by the kill task and the `durationToRetain` time has passed since their creation. Automated cleanup for rules requires a [kill task](#kill-task). +Rule records become eligible for deletion when all segments for the datasource have been killed by the kill task and the `durationToRetain` time has passed since their creation. Automated cleanup for rules requires a [kill task](#segment-records-and-segments-in-deep-storage-kill-task). Rule cleanup uses the following configuration: - `druid.coordinator.kill.rule.on`: When `true`, enables cleanup for rules records. @@ -129,7 +128,7 @@ To prevent the configuration from being prematurely removed, wait for the dataso Unlike other metadata records, compaction configuration records do not have a retention period set by `durationToRetain`. Druid deletes compaction configuration records at every cleanup cycle for inactive datasources, which do not have segments either used or unused. -Compaction configuration records in the `druid_config` table become eligible for deletion after all segments for the datasource have been killed by the kill task. Automated cleanup for compaction configuration requires a [kill task](#kill-task). +Compaction configuration records in the `druid_config` table become eligible for deletion after all segments for the datasource have been killed by the kill task. Automated cleanup for compaction configuration requires a [kill task](#segment-records-and-segments-in-deep-storage-kill-task). Compaction configuration cleanup uses the following configuration: - `druid.coordinator.kill.compaction.on`: When `true`, enables cleanup for compaction configuration records. @@ -153,7 +152,7 @@ Datasource cleanup uses the following configuration: You can configure the Overlord to periodically delete indexer task logs and associated metadata. During cleanup, the Overlord removes the following: * Indexer task logs from deep storage. -* Indexer task log metadata from the tasks and tasklogs tables in [metadata storage](../configuration/index.md#metadata-storage) (named `druid_tasks` and `druid_tasklogs` by default). Druid no longer uses the tasklogs table, and the table is always empty. +* Indexer task log metadata from the tasks table in [metadata storage](../configuration/index.md#metadata-storage) (named `druid_tasks` by default). To configure cleanup of task logs by the Overlord, set the following properties in the `overlord/runtime.properties` file. @@ -188,7 +187,6 @@ druid.coordinator.kill.rule.on=false druid.coordinator.kill.datasource.on=false ``` - ## Example configuration for automated metadata cleanup Consider a scenario where you have scripts to create and delete hundreds of datasources and related entities a day. You do not want to fill your metadata store with leftover records. The datasources and related entities tend to persist for only one or two days. Therefore, you want to run a cleanup job that identifies and removes leftover records that are at least four days old after a seven day buffer period in case you want to recover the data. The exception is for audit logs, which you need to retain for 30 days: diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java index 276a9fe22708..e319dc242660 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/CheckPointDataSourceMetadataAction.java @@ -121,12 +121,6 @@ public Boolean perform(Task task, TaskActionToolbox toolbox) ); } - @Override - public boolean isAudited() - { - return true; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java index bd0be9f74114..1d0059335ed1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClient.java @@ -22,8 +22,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexing.common.task.IndexTaskUtils; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.TaskStorage; -import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; @@ -37,45 +35,21 @@ public class LocalTaskActionClient implements TaskActionClient private static final EmittingLogger log = new EmittingLogger(LocalTaskActionClient.class); private final Task task; - private final TaskStorage storage; private final TaskActionToolbox toolbox; - private final TaskAuditLogConfig auditLogConfig; public LocalTaskActionClient( Task task, - TaskStorage storage, - TaskActionToolbox toolbox, - TaskAuditLogConfig auditLogConfig + TaskActionToolbox toolbox ) { this.task = task; - this.storage = storage; this.toolbox = toolbox; - this.auditLogConfig = auditLogConfig; } @Override public RetType submit(TaskAction taskAction) { log.debug("Performing action for task[%s]: %s", task.getId(), taskAction); - - if (auditLogConfig.isEnabled() && taskAction.isAudited()) { - // Add audit log - try { - final long auditLogStartTime = System.currentTimeMillis(); - storage.addAuditLog(task, taskAction); - emitTimerMetric("task/action/log/time", taskAction, System.currentTimeMillis() - auditLogStartTime); - } - catch (Exception e) { - final String actionClass = taskAction.getClass().getName(); - log.makeAlert(e, "Failed to record action in audit log") - .addData("task", task.getId()) - .addData("actionClass", actionClass) - .emit(); - throw new ISE(e, "Failed to record action [%s] in audit log", actionClass); - } - } - final long performStartTime = System.currentTimeMillis(); final RetType result = performAction(taskAction); emitTimerMetric("task/action/run/time", taskAction, System.currentTimeMillis() - performStartTime); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClientFactory.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClientFactory.java index 60fc01a2390d..c485b1660380 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClientFactory.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LocalTaskActionClientFactory.java @@ -21,27 +21,22 @@ import com.google.inject.Inject; import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.TaskStorage; /** */ public class LocalTaskActionClientFactory implements TaskActionClientFactory { - private final TaskStorage storage; private final TaskActionToolbox toolbox; - private final TaskAuditLogConfig auditLogConfig; @Inject - public LocalTaskActionClientFactory(TaskStorage storage, TaskActionToolbox toolbox, TaskAuditLogConfig auditLogConfig) + public LocalTaskActionClientFactory(TaskActionToolbox toolbox) { - this.storage = storage; this.toolbox = toolbox; - this.auditLogConfig = auditLogConfig; } @Override public TaskActionClient create(Task task) { - return new LocalTaskActionClient(task, storage, toolbox, auditLogConfig); + return new LocalTaskActionClient(task, toolbox); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LockListAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LockListAction.java index 22af74f26b54..c60c57407984 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LockListAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LockListAction.java @@ -39,12 +39,6 @@ public List perform(Task task, TaskActionToolbox toolbox) return toolbox.getTaskLockbox().findLocksForTask(task); } - @Override - public boolean isAudited() - { - return false; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LockReleaseAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LockReleaseAction.java index fa8e1d6bea24..32974117cc7b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LockReleaseAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/LockReleaseAction.java @@ -56,12 +56,6 @@ public Void perform(Task task, TaskActionToolbox toolbox) return null; } - @Override - public boolean isAudited() - { - return false; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java index 93cb75280fac..35a8ed3e35c0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/MarkSegmentsAsUnusedAction.java @@ -67,9 +67,4 @@ public Integer perform(Task task, TaskActionToolbox toolbox) .markSegmentsAsUnusedWithinInterval(dataSource, interval); } - @Override - public boolean isAudited() - { - return true; - } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/ResetDataSourceMetadataAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/ResetDataSourceMetadataAction.java index c853a00c58da..42259b381830 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/ResetDataSourceMetadataAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/ResetDataSourceMetadataAction.java @@ -64,12 +64,6 @@ public Boolean perform(Task task, TaskActionToolbox toolbox) return toolbox.getSupervisorManager().resetSupervisor(dataSource, resetMetadata); } - @Override - public boolean isAudited() - { - return true; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsByIdAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsByIdAction.java index 88d3703f4b09..a4ca90ac62ed 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsByIdAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveSegmentsByIdAction.java @@ -74,12 +74,6 @@ public Set perform(Task task, TaskActionToolbox toolbox) .retrieveSegmentsById(dataSource, segmentIds); } - @Override - public boolean isAudited() - { - return false; - } - @Override public boolean equals(Object o) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java index 3f8d4725835a..fb58328b3d76 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUnusedSegmentsAction.java @@ -101,12 +101,6 @@ public List perform(Task task, TaskActionToolbox toolbox) .retrieveUnusedSegmentsForInterval(dataSource, interval, versions, limit, maxUsedStatusLastUpdatedTime); } - @Override - public boolean isAudited() - { - return false; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java index a107795864ce..473976efd1ea 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/RetrieveUsedSegmentsAction.java @@ -186,12 +186,6 @@ private Set retrieveUsedSegments(TaskActionToolbox toolbox) .retrieveUsedSegmentsForIntervals(dataSource, intervals, visibility); } - @Override - public boolean isAudited() - { - return false; - } - @Override public boolean equals(Object o) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java index d0308516e04b..902dad5dd879 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentAllocateAction.java @@ -386,12 +386,6 @@ private SegmentIdWithShardSpec tryAllocate( } } - @Override - public boolean isAudited() - { - return false; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentInsertAction.java deleted file mode 100644 index 478e0b89d3d9..000000000000 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentInsertAction.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.actions; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.core.type.TypeReference; -import com.google.common.collect.ImmutableSet; -import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.segment.SegmentSchemaMapping; -import org.apache.druid.segment.SegmentUtils; -import org.apache.druid.timeline.DataSegment; - -import javax.annotation.Nullable; -import java.util.Set; - -/** - * Word of warning: Very large "segments" sets can cause oversized audit log entries, which is bad because it means - * that the task cannot actually complete. Callers should avoid this by avoiding inserting too many segments in the - * same action. - */ -public class SegmentInsertAction implements TaskAction> -{ - private final Set segments; - - @Nullable - private final SegmentSchemaMapping segmentSchemaMapping; - - @JsonCreator - public SegmentInsertAction( - @JsonProperty("segments") Set segments, - @JsonProperty("segmentSchemaMapping") @Nullable SegmentSchemaMapping segmentSchemaMapping - ) - { - this.segments = ImmutableSet.copyOf(segments); - this.segmentSchemaMapping = segmentSchemaMapping; - } - - @JsonProperty - public Set getSegments() - { - return segments; - } - - @JsonProperty - @Nullable - public SegmentSchemaMapping getSegmentSchemaMapping() - { - return segmentSchemaMapping; - } - - @Override - public TypeReference> getReturnTypeReference() - { - return new TypeReference>() - { - }; - } - - /** - * Behaves similarly to - * {@link org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator#commitSegments}, - * with startMetadata and endMetadata both null. - */ - @Override - public Set perform(Task task, TaskActionToolbox toolbox) - { - return SegmentTransactionalInsertAction.appendAction(segments, null, null, segmentSchemaMapping).perform(task, toolbox).getSegments(); - } - - @Override - public boolean isAudited() - { - return true; - } - - @Override - public String toString() - { - return "SegmentInsertAction{" + - "segments=" + SegmentUtils.commaSeparatedIdentifiers(segments) + - '}'; - } -} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentLockAcquireAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentLockAcquireAction.java index 70c81225b2f5..e17af3847ca8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentLockAcquireAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentLockAcquireAction.java @@ -119,12 +119,6 @@ public LockResult perform(Task task, TaskActionToolbox toolbox) } } - @Override - public boolean isAudited() - { - return false; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentLockTryAcquireAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentLockTryAcquireAction.java index 7728574c0756..c5084eb61cda 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentLockTryAcquireAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentLockTryAcquireAction.java @@ -102,12 +102,6 @@ public List perform(Task task, TaskActionToolbox toolbox) .collect(Collectors.toList()); } - @Override - public boolean isAudited() - { - return false; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentMetadataUpdateAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentMetadataUpdateAction.java index cbad8bfcc5c2..b2a5ee5843be 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentMetadataUpdateAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentMetadataUpdateAction.java @@ -99,12 +99,6 @@ public Void perform(Task task, TaskActionToolbox toolbox) return null; } - @Override - public boolean isAudited() - { - return true; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java index 2856f161e0e6..326907c8f0a5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentNukeAction.java @@ -101,12 +101,6 @@ public Void perform(Task task, TaskActionToolbox toolbox) return null; } - @Override - public boolean isAudited() - { - return true; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java index 4871e65e162c..2326176885b3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalAppendAction.java @@ -44,7 +44,6 @@ import java.util.stream.Collectors; /** - * * Append segments to metadata storage. The segment versions must all be less than or equal to a lock held by * your task for the segment intervals. * @@ -209,12 +208,6 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) return retVal; } - @Override - public boolean isAudited() - { - return true; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java index c2f542b096e1..e8dd472cf31d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalInsertAction.java @@ -304,12 +304,6 @@ private static Map> groupSegmentsByIntervalAndSort(S return segmentsMap; } - @Override - public boolean isAudited() - { - return true; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java index df188ac81533..572f4aa3f284 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentTransactionalReplaceAction.java @@ -187,12 +187,6 @@ private void registerUpgradedPendingSegmentsOnSupervisor( ); } - @Override - public boolean isAudited() - { - return true; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SurrogateAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SurrogateAction.java index 700220bde3a7..2dbc383c3259 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SurrogateAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SurrogateAction.java @@ -73,12 +73,6 @@ public ReturnType perform(Task task, TaskActionToolbox toolbox) } } - @Override - public boolean isAudited() - { - return taskAction.isAudited(); - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java index 4606bd597a8d..6fed872a0b2d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAction.java @@ -34,7 +34,6 @@ @JsonSubTypes.Type(name = "segmentLockAcquire", value = SegmentLockAcquireAction.class), @JsonSubTypes.Type(name = "lockList", value = LockListAction.class), @JsonSubTypes.Type(name = "lockRelease", value = LockReleaseAction.class), - @JsonSubTypes.Type(name = "segmentInsertion", value = SegmentInsertAction.class), @JsonSubTypes.Type(name = "segmentTransactionalInsert", value = SegmentTransactionalInsertAction.class), @JsonSubTypes.Type(name = "segmentTransactionalAppend", value = SegmentTransactionalAppendAction.class), @JsonSubTypes.Type(name = "segmentTransactionalReplace", value = SegmentTransactionalReplaceAction.class), @@ -59,8 +58,6 @@ public interface TaskAction RetType perform(Task task, TaskActionToolbox toolbox); - boolean isAudited(); - default boolean canPerformAsync(Task task, TaskActionToolbox toolbox) { return false; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAuditLogConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAuditLogConfig.java deleted file mode 100644 index c78aec4f39b5..000000000000 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskAuditLogConfig.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.actions; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - -/** - * The configuration for task audit logging. - * This class will be removed in future releases. See https://github.com/apache/druid/issues/5859. - */ -@Deprecated -public class TaskAuditLogConfig -{ - @JsonProperty - private final boolean enabled; - - @JsonCreator - public TaskAuditLogConfig(@JsonProperty("enabled") boolean enabled) - { - this.enabled = enabled; - } - - @JsonProperty("enabled") - public boolean isEnabled() - { - return enabled; - } -} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TimeChunkLockAcquireAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TimeChunkLockAcquireAction.java index 305a0d7b2843..42f4e40d0e64 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TimeChunkLockAcquireAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TimeChunkLockAcquireAction.java @@ -97,12 +97,6 @@ public TaskLock perform(Task task, TaskActionToolbox toolbox) } } - @Override - public boolean isAudited() - { - return false; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TimeChunkLockTryAcquireAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TimeChunkLockTryAcquireAction.java index 07bcfa5c1011..b2dc6e3d5e62 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TimeChunkLockTryAcquireAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TimeChunkLockTryAcquireAction.java @@ -81,12 +81,6 @@ public TaskLock perform(Task task, TaskActionToolbox toolbox) return result.getTaskLock(); } - @Override - public boolean isAudited() - { - return false; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java index dce9f00c35b4..2c7c265a7f40 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateLocationAction.java @@ -63,12 +63,6 @@ public Void perform(Task task, TaskActionToolbox toolbox) return null; } - @Override - public boolean isAudited() - { - return true; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java index 2ff8375b7f68..d02020acb9d0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/UpdateStatusAction.java @@ -85,12 +85,6 @@ public Void perform(Task task, TaskActionToolbox toolbox) return null; } - @Override - public boolean isAudited() - { - return true; - } - @Override public String toString() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java index e84976db86c4..5109abe2377d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/HeapMemoryTaskStorage.java @@ -327,24 +327,6 @@ public void removeTasksOlderThan(final long timestamp) } } - @Deprecated - @Override - public void addAuditLog(Task task, TaskAction taskAction) - { - synchronized (taskActions) { - taskActions.put(task.getId(), taskAction); - } - } - - @Deprecated - @Override - public List getAuditLogs(String taskid) - { - synchronized (taskActions) { - return ImmutableList.copyOf(taskActions.get(taskid)); - } - } - private static class TaskStuff { final Task task; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java index 15730d48bb1f..3608f0238992 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/MetadataTaskStorage.java @@ -75,14 +75,6 @@ public TypeReference getStatusType() }; } - @Override - public TypeReference getLogType() - { - return new TypeReference() - { - }; - } - @Override public TypeReference getLockType() { @@ -319,24 +311,6 @@ public List getLocks(String taskid) ); } - @Deprecated - @Override - public void addAuditLog(final Task task, final TaskAction taskAction) - { - Preconditions.checkNotNull(taskAction, "taskAction"); - - log.info("Logging action for task[%s]: %s", task.getId(), taskAction); - - handler.addLog(task.getId(), taskAction); - } - - @Deprecated - @Override - public List getAuditLogs(final String taskId) - { - return handler.getLogs(taskId); - } - private Map getLocksWithIds(final String taskid) { return handler.getLocks(taskid); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java index 235c763e1f71..b231b3f37c28 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorage.java @@ -24,7 +24,6 @@ import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; import org.apache.druid.indexing.common.TaskLock; -import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; @@ -110,26 +109,6 @@ public interface TaskStorage @Nullable TaskInfo getTaskInfo(String taskId); - /** - * Add an action taken by a task to the audit log. - * - * @param task task to record action for - * @param taskAction task action to record - * @param task action return type - */ - @Deprecated - void addAuditLog(Task task, TaskAction taskAction); - - /** - * Returns all actions taken by a task. - * - * @param taskid task ID - * - * @return list of task actions - */ - @Deprecated - List getAuditLogs(String taskid); - /** * Returns a list of currently running or pending tasks as stored in the storage facility. No particular order * is guaranteed, but implementations are encouraged to return tasks in ascending order of creation. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java index ba2ca3c7066a..8a454f5a231b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskStorageQueryAdapter.java @@ -24,22 +24,16 @@ import org.apache.druid.indexer.TaskInfo; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexer.TaskStatusPlus; -import org.apache.druid.indexing.common.actions.SegmentInsertAction; -import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; -import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.metadata.LockFilterPolicy; import org.apache.druid.metadata.TaskLookup; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.TaskLookupType; -import org.apache.druid.timeline.DataSegment; import org.joda.time.Interval; import javax.annotation.Nullable; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; /** * Wraps a {@link TaskStorage}, providing a useful collection of read-only methods. @@ -126,27 +120,4 @@ public TaskInfo getTaskInfo(String taskId) return storage.getTaskInfo(taskId); } - /** - * Returns all segments created by this task. - * - * This method is useful when you want to figure out all of the things a single task spawned. It does pose issues - * with the result set perhaps growing boundlessly and we do not do anything to protect against that. Use at your - * own risk and know that at some point, we might adjust this to actually enforce some sort of limits. - * - * @param taskid task ID - * @return set of segments created by the specified task - */ - @Deprecated - public Set getInsertedSegments(final String taskid) - { - final Set segments = new HashSet<>(); - for (final TaskAction action : storage.getAuditLogs(taskid)) { - if (action instanceof SegmentInsertAction) { - segments.addAll(((SegmentInsertAction) action).getSegments()); - } else if (action instanceof SegmentTransactionalInsertAction) { - segments.addAll(((SegmentTransactionalInsertAction) action).getSegments()); - } - } - return segments; - } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java index 56123a561f6b..07155f00462d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java @@ -84,7 +84,6 @@ import org.apache.druid.server.security.ResourceAction; import org.apache.druid.server.security.ResourceType; import org.apache.druid.tasklogs.TaskLogStreamer; -import org.apache.druid.timeline.DataSegment; import org.joda.time.Duration; import org.joda.time.Interval; @@ -400,8 +399,12 @@ public Response getTaskStatus(@PathParam("taskid") String taskid) @ResourceFilters(TaskResourceFilter.class) public Response getTaskSegments(@PathParam("taskid") String taskid) { - final Set segments = taskStorageQueryAdapter.getInsertedSegments(taskid); - return Response.ok().entity(segments).build(); + final String errorMsg = + "Segment IDs committed by a task action are not persisted anymore." + + " Use the metric 'segment/added/bytes' to identify the segments created by a task."; + return Response.status(Status.NOT_FOUND) + .entity(Collections.singletonMap("error", errorMsg)) + .build(); } @POST diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentInsertActionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentInsertActionTest.java deleted file mode 100644 index c8999c2f5d46..000000000000 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/SegmentInsertActionTest.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.common.actions; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import org.apache.druid.indexing.common.TaskLockType; -import org.apache.druid.indexing.common.task.NoopTask; -import org.apache.druid.indexing.common.task.Task; -import org.apache.druid.indexing.overlord.CriticalAction; -import org.apache.druid.indexing.overlord.LockResult; -import org.apache.druid.indexing.overlord.Segments; -import org.apache.druid.indexing.overlord.TimeChunkLockRequest; -import org.apache.druid.java.util.common.Intervals; -import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.partition.LinearShardSpec; -import org.assertj.core.api.Assertions; -import org.hamcrest.CoreMatchers; -import org.joda.time.Interval; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; - -import java.util.Collections; -import java.util.Set; - -public class SegmentInsertActionTest -{ - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Rule - public TaskActionTestKit actionTestKit = new TaskActionTestKit(); - - private static final String DATA_SOURCE = "none"; - private static final Interval INTERVAL = Intervals.of("2020/2020T01"); - private static final String PARTY_YEAR = "1999"; - private static final String THE_DISTANT_FUTURE = "3000"; - - private static final DataSegment SEGMENT1 = new DataSegment( - DATA_SOURCE, - INTERVAL, - PARTY_YEAR, - ImmutableMap.of(), - ImmutableList.of(), - ImmutableList.of(), - new LinearShardSpec(0), - 9, - 1024 - ); - - private static final DataSegment SEGMENT2 = new DataSegment( - DATA_SOURCE, - INTERVAL, - PARTY_YEAR, - ImmutableMap.of(), - ImmutableList.of(), - ImmutableList.of(), - new LinearShardSpec(1), - 9, - 1024 - ); - - private static final DataSegment SEGMENT3 = new DataSegment( - DATA_SOURCE, - INTERVAL, - THE_DISTANT_FUTURE, - ImmutableMap.of(), - ImmutableList.of(), - ImmutableList.of(), - new LinearShardSpec(1), - 9, - 1024 - ); - - private LockResult acquireTimeChunkLock(TaskLockType lockType, Task task, Interval interval, long timeoutMs) - throws InterruptedException - { - return actionTestKit.getTaskLockbox().lock(task, new TimeChunkLockRequest(lockType, task, interval, null), timeoutMs); - } - - @Test - public void testSimple() throws Exception - { - final Task task = NoopTask.create(); - final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT1, SEGMENT2), null); - actionTestKit.getTaskLockbox().add(task); - acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000); - actionTestKit.getTaskLockbox().doInCriticalSection( - task, - Collections.singleton(INTERVAL), - CriticalAction.builder() - .onValidLocks(() -> action.perform(task, actionTestKit.getTaskActionToolbox())) - .onInvalidLocks( - () -> { - Assert.fail(); - return null; - } - ) - .build() - ); - - Assertions.assertThat( - actionTestKit.getMetadataStorageCoordinator() - .retrieveUsedSegmentsForInterval(DATA_SOURCE, INTERVAL, Segments.ONLY_VISIBLE) - ).containsExactlyInAnyOrder(SEGMENT1, SEGMENT2); - } - - @Test - public void testFailBadVersion() throws Exception - { - final Task task = NoopTask.create(); - final SegmentInsertAction action = new SegmentInsertAction(ImmutableSet.of(SEGMENT3), null); - actionTestKit.getTaskLockbox().add(task); - acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, INTERVAL, 5000); - - thrown.expect(IllegalStateException.class); - thrown.expectMessage(CoreMatchers.containsString("are not covered by locks")); - final Set segments = actionTestKit.getTaskLockbox().doInCriticalSection( - task, - Collections.singleton(INTERVAL), - CriticalAction.>builder() - .onValidLocks(() -> action.perform(task, actionTestKit.getTaskActionToolbox())) - .onInvalidLocks( - () -> { - Assert.fail(); - return null; - } - ) - .build() - ); - - Assert.assertEquals(ImmutableSet.of(SEGMENT3), segments); - } -} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CountingLocalTaskActionClientForTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CountingLocalTaskActionClientForTest.java index 63380e84fd9e..cad2feb0deb1 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CountingLocalTaskActionClientForTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CountingLocalTaskActionClientForTest.java @@ -24,7 +24,6 @@ import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionToolbox; -import org.apache.druid.indexing.common.actions.TaskAuditLogConfig; import org.apache.druid.indexing.overlord.TaskStorage; import java.util.concurrent.ConcurrentHashMap; @@ -42,7 +41,7 @@ public CountingLocalTaskActionClientForTest( TaskActionToolbox toolbox ) { - delegate = new LocalTaskActionClient(task, storage, toolbox, new TaskAuditLogConfig(false)); + delegate = new LocalTaskActionClient(task, toolbox); } @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java index 133ced3907dc..bf179e0e45ee 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IngestionTestBase.java @@ -43,7 +43,6 @@ import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.TestUtils; -import org.apache.druid.indexing.common.actions.SegmentInsertAction; import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.actions.TaskAction; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -346,7 +345,8 @@ public TaskActionClient create(Task task) public class TestLocalTaskActionClient extends CountingLocalTaskActionClientForTest { private final Set publishedSegments = new HashSet<>(); - private SegmentSchemaMapping segmentSchemaMapping = new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION); + private final SegmentSchemaMapping segmentSchemaMapping + = new SegmentSchemaMapping(CentralizedDatasourceSchemaConfig.SCHEMA_VERSION); private TestLocalTaskActionClient(Task task) { @@ -358,11 +358,9 @@ public RetType submit(TaskAction taskAction) { final RetType result = super.submit(taskAction); if (taskAction instanceof SegmentTransactionalInsertAction) { - publishedSegments.addAll(((SegmentTransactionalInsertAction) taskAction).getSegments()); - segmentSchemaMapping.merge(((SegmentTransactionalInsertAction) taskAction).getSegmentSchemaMapping()); - } else if (taskAction instanceof SegmentInsertAction) { - publishedSegments.addAll(((SegmentInsertAction) taskAction).getSegments()); - segmentSchemaMapping.merge(((SegmentInsertAction) taskAction).getSegmentSchemaMapping()); + SegmentTransactionalInsertAction insertAction = (SegmentTransactionalInsertAction) taskAction; + publishedSegments.addAll(insertAction.getSegments()); + segmentSchemaMapping.merge(insertAction.getSegmentSchemaMapping()); } return result; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java index 68f0ff77bb9d..02d09b6fc6cc 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RealtimeishTask.java @@ -20,14 +20,13 @@ package org.apache.druid.indexing.overlord; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.TaskLock; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.LockListAction; import org.apache.druid.indexing.common.actions.LockReleaseAction; -import org.apache.druid.indexing.common.actions.SegmentInsertAction; +import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TimeChunkLockAcquireAction; import org.apache.druid.indexing.common.config.TaskConfig; @@ -38,6 +37,7 @@ import org.joda.time.Interval; import org.junit.Assert; +import java.util.Collections; import java.util.List; /** @@ -97,18 +97,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception Assert.assertEquals("locks2", ImmutableList.of(lock1, lock2), locks2); // Push first segment - SegmentInsertAction firstSegmentInsertAction = new SegmentInsertAction( - ImmutableSet.of( - DataSegment.builder() - .dataSource("foo") - .interval(interval1) - .version(lock1.getVersion()) - .size(0) - .build() - ), - null - ); - toolbox.getTaskActionClient().submit(firstSegmentInsertAction); + toolbox.getTaskActionClient().submit(createSegmentInsertAction(interval1, lock1.getVersion())); // Release first lock toolbox.getTaskActionClient().submit(new LockReleaseAction(interval1)); @@ -118,18 +107,7 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception Assert.assertEquals("locks3", ImmutableList.of(lock2), locks3); // Push second segment - SegmentInsertAction secondSegmentInsertAction = new SegmentInsertAction( - ImmutableSet.of( - DataSegment.builder() - .dataSource("foo") - .interval(interval2) - .version(lock2.getVersion()) - .size(0) - .build() - ), - null - ); - toolbox.getTaskActionClient().submit(secondSegmentInsertAction); + toolbox.getTaskActionClient().submit(createSegmentInsertAction(interval2, lock2.getVersion())); // Release second lock toolbox.getTaskActionClient().submit(new LockReleaseAction(interval2)); @@ -141,4 +119,17 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception // Exit return TaskStatus.success(getId()); } + + private SegmentTransactionalInsertAction createSegmentInsertAction(Interval interval, String version) + { + final DataSegment segmentToInsert + = DataSegment.builder() + .dataSource("foo") + .interval(interval) + .version(version) + .size(0) + .build(); + return SegmentTransactionalInsertAction + .appendAction(Collections.singleton(segmentToInsert), null, null, null); + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 91d74a2bd677..f172eb9da513 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -63,11 +63,10 @@ import org.apache.druid.indexing.common.TestUtils; import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory; import org.apache.druid.indexing.common.actions.LockListAction; -import org.apache.druid.indexing.common.actions.SegmentInsertAction; +import org.apache.druid.indexing.common.actions.SegmentTransactionalInsertAction; import org.apache.druid.indexing.common.actions.TaskActionClient; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.actions.TaskActionToolbox; -import org.apache.druid.indexing.common.actions.TaskAuditLogConfig; import org.apache.druid.indexing.common.actions.TimeChunkLockTryAcquireAction; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfigBuilder; @@ -592,7 +591,6 @@ private TaskToolboxFactory setUpTaskToolboxFactory( taskLockbox = new TaskLockbox(taskStorage, mdc); tac = new LocalTaskActionClientFactory( - taskStorage, new TaskActionToolbox( taskLockbox, taskStorage, @@ -600,8 +598,7 @@ private TaskToolboxFactory setUpTaskToolboxFactory( emitter, EasyMock.createMock(SupervisorManager.class), mapper - ), - new TaskAuditLogConfig(true) + ) ); taskConfig = new TaskConfigBuilder() .setBaseDir(temporaryFolder.newFolder().toString()) @@ -767,12 +764,10 @@ public void testIndexTask() final TaskStatus mergedStatus = runTask(indexTask); final TaskStatus status = taskStorage.getStatus(indexTask.getId()).get(); final List publishedSegments = BY_INTERVAL_ORDERING.sortedCopy(mdc.getPublished()); - final List loggedSegments = BY_INTERVAL_ORDERING.sortedCopy(tsqa.getInsertedSegments(indexTask.getId())); Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode()); Assert.assertEquals(taskLocation, status.getLocation()); Assert.assertEquals("merged statusCode", TaskState.SUCCESS, mergedStatus.getStatusCode()); - Assert.assertEquals("segments logged vs published", loggedSegments, publishedSegments); Assert.assertEquals("num segments published", 2, mdc.getPublished().size()); Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size()); @@ -1143,7 +1138,9 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception .size(0) .build(); - toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment), null)); + toolbox.getTaskActionClient().submit( + SegmentTransactionalInsertAction.appendAction(ImmutableSet.of(segment), null, null, null) + ); return TaskStatus.success(getId()); } }; @@ -1184,7 +1181,9 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception .size(0) .build(); - toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment), null)); + toolbox.getTaskActionClient().submit( + SegmentTransactionalInsertAction.appendAction(ImmutableSet.of(segment), null, null, null) + ); return TaskStatus.success(getId()); } }; @@ -1226,7 +1225,9 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception .size(0) .build(); - toolbox.getTaskActionClient().submit(new SegmentInsertAction(ImmutableSet.of(segment), null)); + toolbox.getTaskActionClient().submit( + SegmentTransactionalInsertAction.appendAction(ImmutableSet.of(segment), null, null, null) + ); return TaskStatus.success(getId()); } }; @@ -1306,11 +1307,9 @@ public void testResumeTasks() throws Exception final TaskStatus status = taskStorage.getStatus(indexTask.getId()).get(); final List publishedSegments = BY_INTERVAL_ORDERING.sortedCopy(mdc.getPublished()); - final List loggedSegments = BY_INTERVAL_ORDERING.sortedCopy(tsqa.getInsertedSegments(indexTask.getId())); Assert.assertEquals("statusCode", TaskState.SUCCESS, status.getStatusCode()); Assert.assertEquals(taskLocation, status.getLocation()); - Assert.assertEquals("segments logged vs published", loggedSegments, publishedSegments); Assert.assertEquals("num segments published", 2, mdc.getPublished().size()); Assert.assertEquals("num segments nuked", 0, mdc.getNuked().size()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index 51b0cfe742e6..3af672a665b5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -197,10 +197,8 @@ public void tearDown() ); } - @Test - public void testLeader() + private void replayAll() { - EasyMock.expect(taskMaster.getCurrentLeader()).andReturn("boz").once(); EasyMock.replay( taskRunner, taskMaster, @@ -208,8 +206,18 @@ public void testLeader() indexerMetadataStorageAdapter, req, workerTaskRunnerQueryAdapter, - authConfig + authConfig, + configManager, + auditManager, + provisioningStrategy ); + } + + @Test + public void testLeader() + { + EasyMock.expect(taskMaster.getCurrentLeader()).andReturn("boz").once(); + replayAll(); final Response response = overlordResource.getLeader(); Assert.assertEquals("boz", response.getEntity()); @@ -221,15 +229,7 @@ public void testIsLeader() { EasyMock.expect(taskMaster.isLeader()).andReturn(true).once(); EasyMock.expect(taskMaster.isLeader()).andReturn(false).once(); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); // true final Response response1 = overlordResource.isLeader(); @@ -267,15 +267,7 @@ public void testSecuredGetWaitingTask() ) ); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); List responseObjects = (List) overlordResource.getWaitingTasks(req) .getEntity(); @@ -299,15 +291,7 @@ public void testSecuredGetCompleteTasks() createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow") ) ); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); List responseObjects = (List) overlordResource .getCompleteTasks(null, req).getEntity(); @@ -341,15 +325,7 @@ public void testSecuredGetRunningTasks() EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andStubReturn(RunnerTaskState.RUNNING); EasyMock.expect(taskRunner.getRunnerTaskState("id_2")).andStubReturn(RunnerTaskState.RUNNING); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); List responseObjects = (List) overlordResource.getRunningTasks(null, req) .getEntity(); @@ -390,15 +366,7 @@ public void testGetTasks() ) ).atLeastOnce(); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); List responseObjects = (List) overlordResource .getTasks(null, null, null, null, null, req) .getEntity(); @@ -437,15 +405,7 @@ public void testGetTasksFilterDataSource() new MockTaskRunnerWorkItem("id_4") ) ).atLeastOnce(); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); List responseObjects = (List) overlordResource .getTasks(null, "allow", null, null, null, req) @@ -484,15 +444,7 @@ public void testGetTasksFilterWaitingState() ) ); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); List responseObjects = (List) overlordResource .getTasks( "waiting", @@ -537,15 +489,7 @@ public void testGetTasksFilterRunningState() EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andReturn(RunnerTaskState.RUNNING); EasyMock.expect(taskRunner.getRunnerTaskState("id_2")).andReturn(RunnerTaskState.RUNNING); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); List responseObjects = (List) overlordResource .getTasks("running", "allow", null, null, null, req) @@ -587,15 +531,7 @@ public void testGetTasksFilterPendingState() EasyMock.expect(taskRunner.getRunnerTaskState("id_3")).andStubReturn(RunnerTaskState.RUNNING); EasyMock.expect(taskRunner.getRunnerTaskState("id_4")).andStubReturn(RunnerTaskState.RUNNING); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); List responseObjects = (List) overlordResource .getTasks("pending", null, null, null, null, req) @@ -622,15 +558,7 @@ public void testGetTasksFilterCompleteState() createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow") ) ); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); List responseObjects = (List) overlordResource .getTasks("complete", null, null, null, null, req) .getEntity(); @@ -657,15 +585,7 @@ public void testGetTasksFilterCompleteStateWithInterval() ) ); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); String interval = "2010-01-01_P1D"; List responseObjects = (List) overlordResource .getTasks("complete", null, interval, null, null, req) @@ -712,16 +632,7 @@ public void testGetTasksRequiresDatasourceRead() EasyMock.expect(taskRunner.getRunnerTaskState("id_4")).andReturn(RunnerTaskState.PENDING); EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andReturn(RunnerTaskState.RUNNING); - // Replay all mocks - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); // Verify that only the tasks of read access datasource are returned List responseObjects = (List) overlordResource @@ -769,16 +680,7 @@ public void testGetTasksFilterByTaskTypeRequiresDatasourceRead() EasyMock.expect(taskRunner.getRunnerTaskState("id_1")).andReturn(RunnerTaskState.RUNNING); - // Replay all mocks - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); // Verify that only the tasks of read access datasource are returned List responseObjects = (List) overlordResource @@ -797,16 +699,7 @@ public void testGetTasksFilterByDatasourceRequiresReadAccess() // and no access to "buzzfeed" expectAuthorizationTokenCheck(Users.WIKI_READER); - // Replay all mocks - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); // Verify that only the tasks of read access datasource are returned expectedException.expect(WebApplicationException.class); @@ -832,15 +725,7 @@ public void testGetCompleteTasksOfAllDatasources() createTaskStatusPlus("id_3", TaskState.SUCCESS, "allow") ) ); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); List responseObjects = (List) overlordResource .getTasks("complete", null, null, null, null, req) .getEntity(); @@ -852,15 +737,7 @@ public void testGetCompleteTasksOfAllDatasources() @Test public void testGetTasksNegativeState() { - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); Object responseObject = overlordResource .getTasks("blah", "ds_test", null, null, null, req) .getEntity(); @@ -877,15 +754,7 @@ public void testSecuredTaskPost() expectAuthorizationTokenCheck(); EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); Task task = NoopTask.create(); overlordResource.taskPost(task, req); } @@ -914,17 +783,7 @@ public void testKillTaskIsAudited() auditManager.doAudit(EasyMock.capture(auditEntryCapture)); EasyMock.expectLastCall().once(); - EasyMock.replay( - taskRunner, - taskMaster, - taskQueue, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig, - auditManager - ); + replayAll(); Task task = new KillUnusedSegmentsTask("kill_all", "allow", Intervals.ETERNITY, null, null, 10, null, null); overlordResource.taskPost(task, req); @@ -943,15 +802,7 @@ public void testTaskPostDeniesDatasourceReadUser() expectAuthorizationTokenCheck(Users.WIKI_READER); EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(false); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); // Verify that taskPost fails for user who has only datasource read access Task task = NoopTask.forDatasource(Datasources.WIKIPEDIA); @@ -975,15 +826,7 @@ public void testKillPendingSegments() ) .andReturn(2); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); Response response = overlordResource .killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req); @@ -1008,15 +851,7 @@ public void testKillPendingSegmentsThrowsInvalidInputDruidException() .andThrow(InvalidInput.exception(exceptionMsg)) .once(); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); Response response = overlordResource .killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req); @@ -1042,15 +877,7 @@ public void testKillPendingSegmentsThrowsDefensiveDruidException() .andThrow(DruidException.defensive(exceptionMsg)) .once(); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); Response response = overlordResource .killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req); @@ -1076,15 +903,7 @@ public void testKillPendingSegmentsThrowsArbitraryException() .andThrow(new IllegalStateException(exceptionMsg)) .once(); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); Response response = overlordResource .killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req); @@ -1100,15 +919,7 @@ public void testKillPendingSegmentsToNonLeader() EasyMock.expect(taskMaster.isLeader()).andReturn(false); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); Response response = overlordResource .killPendingSegments("allow", new Interval(DateTimes.MIN, DateTimes.nowUtc()).toString(), req); @@ -1131,15 +942,7 @@ public void testGetTaskPayload() throws Exception EasyMock.expect(taskStorageQueryAdapter.getTask("othertask")) .andReturn(Optional.absent()); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); final Response response1 = overlordResource.getTaskPayload("mytask"); final TaskPayloadResponse taskPayloadResponse1 = TestHelper.makeJsonMapper().readValue( @@ -1182,15 +985,7 @@ public void testGetTaskStatus() throws Exception EasyMock.>expect(taskRunner.getKnownTasks()) .andReturn(ImmutableList.of()); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); final Response response1 = overlordResource.getTaskStatus(taskId); final TaskStatusResponse taskStatusResponse1 = TestHelper.makeJsonMapper().readValue( @@ -1241,15 +1036,7 @@ public void testGetLockedIntervals() throws Exception EasyMock.expect(taskStorageQueryAdapter.getLockedIntervals(minTaskPriority)) .andReturn(expectedLockedIntervals); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); final Response response = overlordResource.getDatasourceLockedIntervals(minTaskPriority); Assert.assertEquals(200, response.getStatus()); @@ -1268,15 +1055,7 @@ public void testGetLockedIntervals() throws Exception @Test public void testGetLockedIntervalsWithEmptyBody() { - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); Response response = overlordResource.getDatasourceLockedIntervals(null); Assert.assertEquals(400, response.getStatus()); @@ -1302,16 +1081,8 @@ public void testShutdownTask() mockQueue.shutdown("id_1", "Shutdown request from user"); EasyMock.expectLastCall(); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - mockQueue, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); + EasyMock.replay(mockQueue); final Map response = (Map) overlordResource .doShutdown("id_1") @@ -1354,16 +1125,8 @@ public void testShutdownAllTasks() mockQueue.shutdown("id_2", "Shutdown request from user"); EasyMock.expectLastCall(); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - mockQueue, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); + EasyMock.replay(mockQueue); final Map response = (Map) overlordResource .shutdownTasksForDataSource("datasource") @@ -1378,15 +1141,7 @@ public void testShutdownAllTasksForNonExistingDataSource() EasyMock.expect(taskMaster.isLeader()).andReturn(true).anyTimes(); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); EasyMock.expect(taskStorageQueryAdapter.getActiveTaskInfo(EasyMock.anyString())).andReturn(Collections.emptyList()); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); final Response response = overlordResource.shutdownTasksForDataSource("notExisting"); Assert.assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus()); @@ -1400,15 +1155,7 @@ public void testEnableWorker() workerTaskRunnerQueryAdapter.enableWorker(host); EasyMock.expectLastCall().once(); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); final Response response = overlordResource.enableWorker(host); @@ -1424,15 +1171,7 @@ public void testDisableWorker() workerTaskRunnerQueryAdapter.disableWorker(host); EasyMock.expectLastCall().once(); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); final Response response = overlordResource.disableWorker(host); @@ -1448,15 +1187,7 @@ public void testEnableWorkerWhenWorkerAPIRaisesError() workerTaskRunnerQueryAdapter.enableWorker(host); EasyMock.expectLastCall().andThrow(new RE("Worker API returns error!")).once(); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); final Response response = overlordResource.enableWorker(host); @@ -1472,15 +1203,7 @@ public void testDisableWorkerWhenWorkerAPIRaisesError() workerTaskRunnerQueryAdapter.disableWorker(host); EasyMock.expectLastCall().andThrow(new RE("Worker API returns error!")).once(); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - authConfig - ); + replayAll(); final Response response = overlordResource.disableWorker(host); @@ -1495,16 +1218,7 @@ public void testGetTotalWorkerCapacityNotLeader() EasyMock.expect(taskMaster.getTaskRunner()).andReturn( Optional.absent() ).anyTimes(); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - configManager, - authConfig - ); + replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.SERVICE_UNAVAILABLE.getCode(), response.getStatus()); } @@ -1517,16 +1231,7 @@ public void testGetTotalWorkerCapacityWithUnknown() EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - configManager, - authConfig - ); + replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); @@ -1541,16 +1246,7 @@ public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButWorkerBehaviorConfi EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - configManager, - authConfig - ); + replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); @@ -1566,16 +1262,7 @@ public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButAutoScaleNotConfigu EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); - EasyMock.replay( - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - configManager, - authConfig - ); + replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); @@ -1617,17 +1304,9 @@ public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStra EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); EasyMock.replay( workerTaskRunner, - autoScaler, - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - configManager, - provisioningStrategy, - authConfig + autoScaler ); + replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); @@ -1670,17 +1349,9 @@ public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStra EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); EasyMock.replay( workerTaskRunner, - autoScaler, - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter, - configManager, - provisioningStrategy, - authConfig + autoScaler ); + replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); Assert.assertEquals(workerInfos.stream().findFirst().get().getWorker().getCapacity(), ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); @@ -1706,16 +1377,8 @@ public void testResourceActionsForTaskWithInputTypeAndInputSecurityEnabled() Action.READ ))); - EasyMock.replay( - task, - authConfig, - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter - ); + EasyMock.replay(task); + replayAll(); Set expectedResourceActions = ImmutableSet.of( new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.WRITE), @@ -1728,7 +1391,6 @@ public void testResourceActionsForTaskWithInputTypeAndInputSecurityEnabled() @Test public void testResourceActionsForTaskWithFirehoseAndInputSecurityEnabled() { - final String dataSource = "dataSourceTest"; final UOE expectedException = new UOE("unsupported"); Task task = EasyMock.createMock(Task.class); @@ -1739,17 +1401,8 @@ public void testResourceActionsForTaskWithFirehoseAndInputSecurityEnabled() EasyMock.expect(task.getDestinationResource()).andReturn(java.util.Optional.of(new Resource(dataSource, ResourceType.DATASOURCE))); EasyMock.expect(task.getInputSourceResources()).andThrow(expectedException); - EasyMock.replay( - task, - authConfig, - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter - ); - + EasyMock.replay(task); + replayAll(); final UOE e = Assert.assertThrows( UOE.class, @@ -1762,7 +1415,6 @@ public void testResourceActionsForTaskWithFirehoseAndInputSecurityEnabled() @Test public void testResourceActionsForTaskWithInputTypeAndInputSecurityDisabled() { - final String dataSource = "dataSourceTest"; final String inputSourceType = "local"; Task task = EasyMock.createMock(Task.class); @@ -1776,16 +1428,8 @@ public void testResourceActionsForTaskWithInputTypeAndInputSecurityDisabled() Action.READ ))); - EasyMock.replay( - task, - authConfig, - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter - ); + EasyMock.replay(task); + replayAll(); Set expectedResourceActions = ImmutableSet.of( new ResourceAction(new Resource(dataSource, ResourceType.DATASOURCE), Action.WRITE) @@ -1797,24 +1441,14 @@ public void testResourceActionsForTaskWithInputTypeAndInputSecurityDisabled() @Test public void testGetMultipleTaskStatuses_presentTaskQueue() { - // Needed for teardown - EasyMock.replay( - authConfig, - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter - ); + replayAll(); TaskQueue taskQueue = EasyMock.createMock(TaskQueue.class); EasyMock.expect(taskQueue.getTaskStatus("task")) .andReturn(Optional.of(TaskStatus.running("task"))); - EasyMock.replay(taskQueue); TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)); - EasyMock.replay(taskMaster); + EasyMock.replay(taskMaster, taskQueue); OverlordResource overlordResource = new OverlordResource( taskMaster, null, @@ -1835,24 +1469,14 @@ public void testGetMultipleTaskStatuses_presentTaskQueue() @Test public void testGetMultipleTaskStatuses_absentTaskQueue() { - // Needed for teardown - EasyMock.replay( - authConfig, - taskRunner, - taskMaster, - taskStorageQueryAdapter, - indexerMetadataStorageAdapter, - req, - workerTaskRunnerQueryAdapter - ); + replayAll(); TaskStorageQueryAdapter taskStorageQueryAdapter = EasyMock.createMock(TaskStorageQueryAdapter.class); EasyMock.expect(taskStorageQueryAdapter.getStatus("task")) .andReturn(Optional.of(TaskStatus.running("task"))); - EasyMock.replay(taskStorageQueryAdapter); TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class); EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()); - EasyMock.replay(taskMaster); + EasyMock.replay(taskMaster, taskStorageQueryAdapter); OverlordResource overlordResource = new OverlordResource( taskMaster, taskStorageQueryAdapter, @@ -1870,6 +1494,24 @@ public void testGetMultipleTaskStatuses_absentTaskQueue() Assert.assertEquals(ImmutableMap.of("task", TaskStatus.running("task")), response); } + @Test + public void testGetTaskSegmentsReturns404() + { + replayAll(); + OverlordResource overlordResource = + new OverlordResource(null, null, null, null, null, null, null, null, null, null); + final Response response = overlordResource.getTaskSegments("taskId"); + Assert.assertEquals(404, response.getStatus()); + Assert.assertEquals( + Collections.singletonMap( + "error", + "Segment IDs committed by a task action are not persisted anymore." + + " Use the metric 'segment/added/bytes' to identify the segments created by a task." + ), + response.getEntity() + ); + } + private void expectAuthorizationTokenCheck() { expectAuthorizationTokenCheck(Users.DRUID); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java index 1a9e5a17e1cf..3b5775c22b83 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTestBase.java @@ -62,7 +62,6 @@ import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.actions.TaskActionToolbox; -import org.apache.druid.indexing.common.actions.TaskAuditLogConfig; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskConfigBuilder; import org.apache.druid.indexing.common.config.TaskStorageConfig; @@ -631,9 +630,7 @@ public boolean checkPointDataSourceMetadata( objectMapper ); final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory( - taskStorage, - taskActionToolbox, - new TaskAuditLogConfig(false) + taskActionToolbox ); final SegmentHandoffNotifierFactory handoffNotifierFactory = dataSource -> new SegmentHandoffNotifier() { diff --git a/processing/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java b/processing/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java index 45bfde49a8af..4a5a48e8ef78 100644 --- a/processing/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java +++ b/processing/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandler.java @@ -20,6 +20,8 @@ package org.apache.druid.metadata; import com.google.common.base.Optional; +import org.apache.druid.error.DruidException; +import org.apache.druid.guice.annotations.ExtensionPoint; import org.apache.druid.indexer.TaskIdentifier; import org.apache.druid.indexer.TaskInfo; import org.apache.druid.metadata.TaskLookup.TaskLookupType; @@ -31,6 +33,7 @@ import java.util.List; import java.util.Map; +@ExtensionPoint public interface MetadataStorageActionHandler { /** @@ -161,21 +164,34 @@ default List> getTaskInfos( void removeTasksOlderThan(long timestamp); /** - * Add a log to the entry with the given id. + * Task logs are not used anymore and this method is never called by Druid code. + * It has been retained only for backwards compatibility with older extensions. + * New extensions must not implement this method. * - * @param entryId entry id - * @param log log to add - * @return true if the log was added + * @throws DruidException of category UNSUPPORTED whenever called. */ - boolean addLog(String entryId, LogType log); + @Deprecated + default boolean addLog(String entryId, LogType log) + { + throw DruidException.defensive() + .ofCategory(DruidException.Category.UNSUPPORTED) + .build("Task actions are not logged anymore."); + } /** - * Returns the logs for the entry with the given id. + * Task logs are not used anymore and this method is never called by Druid code. + * It has been retained only for backwards compatibility with older extensions. + * New extensions must not implement this method. * - * @param entryId entry id - * @return list of logs + * @throws DruidException of category UNSUPPORTED whenever called. */ - List getLogs(String entryId); + @Deprecated + default List getLogs(String entryId) + { + throw DruidException.defensive() + .ofCategory(DruidException.Category.UNSUPPORTED) + .build("Task actions are not logged anymore."); + } /** * Returns the locks for the given entry @@ -188,7 +204,7 @@ default List> getTaskInfos( /** * Returns the lock id for the given entry and the lock. * - * @return lock id if found. Otherwise null. + * @return lock id if found, otherwise null. */ @Nullable Long getLockId(String entryId, LockType lock); diff --git a/processing/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandlerTypes.java b/processing/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandlerTypes.java index 79098635ca7f..a3597d988c1a 100644 --- a/processing/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandlerTypes.java +++ b/processing/src/main/java/org/apache/druid/metadata/MetadataStorageActionHandlerTypes.java @@ -25,6 +25,5 @@ public interface MetadataStorageActionHandlerTypes getEntryType(); TypeReference getStatusType(); - TypeReference getLogType(); TypeReference getLockType(); } diff --git a/processing/src/test/java/org/apache/druid/metadata/MetadataStorageActionHandlerTest.java b/processing/src/test/java/org/apache/druid/metadata/MetadataStorageActionHandlerTest.java new file mode 100644 index 000000000000..41a0a55b284f --- /dev/null +++ b/processing/src/test/java/org/apache/druid/metadata/MetadataStorageActionHandlerTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.metadata; + +import com.google.common.base.Optional; +import org.apache.druid.error.DruidException; +import org.apache.druid.indexer.TaskIdentifier; +import org.apache.druid.indexer.TaskInfo; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * Tests the default methods of the interface {@link MetadataStorageActionHandler}. + * Required only for coverage as these methods are already being tested in + * {@code SQLMetadataStorageActionHandlerTest}. + */ +public class MetadataStorageActionHandlerTest +{ + + private MetadataStorageActionHandler handler; + + @Before + public void setup() + { + this.handler = new MetadataStorageActionHandler() + { + @Override + public void insert( + String id, + DateTime timestamp, + String dataSource, + String entry, + boolean active, + @Nullable String status, + String type, + String groupId + ) + { + + } + + @Override + public boolean setStatus(String entryId, boolean active, String status) + { + return false; + } + + @Override + public Optional getEntry(String entryId) + { + return null; + } + + @Override + public Optional getStatus(String entryId) + { + return null; + } + + @Nullable + @Override + public TaskInfo getTaskInfo(String entryId) + { + return null; + } + + @Override + public List> getTaskInfos( + Map taskLookups, + @Nullable String datasource + ) + { + return Collections.emptyList(); + } + + @Override + public List> getTaskStatusList( + Map taskLookups, + @Nullable String datasource + ) + { + return Collections.emptyList(); + } + + @Override + public boolean addLock(String entryId, String lock) + { + return false; + } + + @Override + public boolean replaceLock(String entryId, long oldLockId, String newLock) + { + return false; + } + + @Override + public void removeLock(long lockId) + { + + } + + @Override + public void removeTasksOlderThan(long timestamp) + { + + } + + @Override + public Map getLocks(String entryId) + { + return Collections.emptyMap(); + } + + @Override + public Long getLockId(String entryId, String lock) + { + return 0L; + } + + @Override + public void populateTaskTypeAndGroupIdAsync() + { + + } + }; + } + + @Test + public void testAddLogThrowsUnsupportedException() + { + Exception exception = Assert.assertThrows( + DruidException.class, + () -> handler.addLog("abcd", "logentry") + ); + Assert.assertEquals( + "Task actions are not logged anymore.", + exception.getMessage() + ); + } + + @Test + public void testGetLogsThrowsUnsupportedException() + { + Exception exception = Assert.assertThrows( + DruidException.class, + () -> handler.getLogs("abcd") + ); + Assert.assertEquals( + "Task actions are not logged anymore.", + exception.getMessage() + ); + } +} diff --git a/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java index fa39d506b4cc..c3265f388294 100644 --- a/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/DerbyMetadataStorageActionHandler.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; -import org.apache.druid.java.util.common.StringUtils; public class DerbyMetadataStorageActionHandler extends SQLMetadataStorageActionHandler @@ -46,12 +45,4 @@ protected String decorateSqlWithLimit(String sql) return sql + " FETCH FIRST :n ROWS ONLY"; } - @Deprecated - @Override - public String getSqlRemoveLogsOlderThan() - { - return StringUtils.format("DELETE FROM %s WHERE %s_id in (" - + " SELECT id FROM %s WHERE created_date < :date_time and active = false)", - getLogTable(), getEntryTypeName(), getEntryTable()); - } } diff --git a/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java index b91427bcd662..6bed60b89b41 100644 --- a/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/PostgreSQLMetadataStorageActionHandler.java @@ -20,7 +20,6 @@ package org.apache.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.druid.java.util.common.StringUtils; public class PostgreSQLMetadataStorageActionHandler extends SQLMetadataStorageActionHandler @@ -44,13 +43,4 @@ protected String decorateSqlWithLimit(String sql) return sql + " LIMIT :n"; } - @Deprecated - @Override - public String getSqlRemoveLogsOlderThan() - { - return StringUtils.format("DELETE FROM %s USING %s " - + "WHERE %s_id = %s.id AND created_date < :date_time and active = false", - getLogTable(), getEntryTable(), getEntryTypeName(), getEntryTable()); - } - } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java index 2d315d19fc8b..02a3aadb7ba8 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java @@ -518,25 +518,6 @@ private void alterPendingSegmentsTable(final String tableName) ); } - public void createLogTable(final String tableName, final String entryTypeName) - { - createTable( - tableName, - ImmutableList.of( - StringUtils.format( - "CREATE TABLE %1$s (\n" - + " id %2$s NOT NULL,\n" - + " %4$s_id VARCHAR(255) DEFAULT NULL,\n" - + " log_payload %3$s,\n" - + " PRIMARY KEY (id)\n" - + ")", - tableName, getSerialType(), getPayloadType(), entryTypeName - ), - StringUtils.format("CREATE INDEX idx_%1$s_%2$s_id ON %1$s(%2$s_id)", tableName, entryTypeName) - ) - ); - } - public void createLockTable(final String tableName, final String entryTypeName) { createTable( @@ -804,7 +785,6 @@ public void createTaskTables() final MetadataStorageTablesConfig tablesConfig = tablesConfigSupplier.get(); final String entryType = tablesConfig.getTaskEntryType(); prepareTaskEntryTable(tablesConfig.getEntryTable(entryType)); - createLogTable(tablesConfig.getLogTable(entryType), entryType); createLockTable(tablesConfig.getLockTable(entryType), entryType); } } diff --git a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java index 8b003340bed0..6cbefc84eeb4 100644 --- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataStorageActionHandler.java @@ -75,12 +75,10 @@ public abstract class SQLMetadataStorageActionHandler entryType; private final TypeReference statusType; - private final TypeReference logType; private final TypeReference lockType; private final String entryTypeName; private final String entryTable; - private final String logTable; private final String lockTable; private final TaskInfoMapper taskInfoMapper; @@ -90,7 +88,11 @@ public abstract class SQLMetadataStorageActionHandler taskMigrationCompleteFuture; - @SuppressWarnings("PMD.UnnecessaryFullyQualifiedName") + /** + * @deprecated Use the other constructor without {@code logTable} argument + * since this argument is now unused. + */ + @Deprecated public SQLMetadataStorageActionHandler( final SQLMetadataConnector connector, final ObjectMapper jsonMapper, @@ -100,6 +102,19 @@ public SQLMetadataStorageActionHandler( final String logTable, final String lockTable ) + { + this(connector, jsonMapper, types, entryTypeName, entryTable, lockTable); + } + + @SuppressWarnings("PMD.UnnecessaryFullyQualifiedName") + public SQLMetadataStorageActionHandler( + final SQLMetadataConnector connector, + final ObjectMapper jsonMapper, + final MetadataStorageActionHandlerTypes types, + final String entryTypeName, + final String entryTable, + final String lockTable + ) { this.connector = connector; //fully qualified references required below due to identical package names across project modules. @@ -108,11 +123,9 @@ public SQLMetadataStorageActionHandler( org.apache.druid.metadata.PasswordProviderRedactionMixIn.class); this.entryType = types.getEntryType(); this.statusType = types.getStatusType(); - this.logType = types.getLogType(); this.lockType = types.getLockType(); this.entryTypeName = entryTypeName; this.entryTable = entryTable; - this.logTable = logTable; this.lockTable = lockTable; this.taskInfoMapper = new TaskInfoMapper<>(jsonMapper, entryType, statusType); this.taskStatusMapper = new TaskStatusMapper(jsonMapper); @@ -142,7 +155,7 @@ protected String getEntryTable() protected String getLogTable() { - return logTable; + throw new UnsupportedOperationException("'tasklogs' table is not used anymore"); } protected String getEntryTypeName() @@ -430,7 +443,7 @@ List> getTaskStatusList( } /** - * Wraps the given error in a user friendly DruidException. + * Wraps the given error in a user-friendly DruidException. */ private DruidException wrapInDruidException(String taskId, Throwable t) { @@ -855,21 +868,13 @@ public void removeTasksOlderThan(final long timestamp) { DateTime dateTime = DateTimes.utc(timestamp); connector.retryWithHandle( - handle -> { - handle.createStatement(getSqlRemoveLogsOlderThan()) - .bind("date_time", dateTime.toString()) - .execute(); + handle -> handle.createStatement( StringUtils.format( "DELETE FROM %s WHERE created_date < :date_time AND active = false", entryTable ) - ) - .bind("date_time", dateTime.toString()) - .execute(); - - return null; - } + ).bind("date_time", dateTime.toString()).execute() ); } @@ -880,78 +885,6 @@ private int removeLock(Handle handle, long lockId) .execute(); } - @Override - public boolean addLog(final String entryId, final LogType log) - { - return connector.retryWithHandle( - new HandleCallback() - { - @Override - public Boolean withHandle(Handle handle) throws Exception - { - return handle.createStatement( - StringUtils.format( - "INSERT INTO %1$s (%2$s_id, log_payload) VALUES (:entryId, :payload)", - logTable, entryTypeName - ) - ) - .bind("entryId", entryId) - .bind("payload", jsonMapper.writeValueAsBytes(log)) - .execute() == 1; - } - } - ); - } - - @Override - public List getLogs(final String entryId) - { - return connector.retryWithHandle( - new HandleCallback>() - { - @Override - public List withHandle(Handle handle) - { - return handle - .createQuery( - StringUtils.format( - "SELECT log_payload FROM %1$s WHERE %2$s_id = :entryId", - logTable, entryTypeName - ) - ) - .bind("entryId", entryId) - .map(ByteArrayMapper.FIRST) - .fold( - new ArrayList<>(), - (List list, byte[] bytes, FoldController control, StatementContext ctx) -> { - try { - list.add(jsonMapper.readValue(bytes, logType)); - return list; - } - catch (IOException e) { - log.makeAlert(e, "Failed to deserialize log") - .addData("entryId", entryId) - .addData("payload", StringUtils.fromUtf8(bytes)) - .emit(); - throw new SQLException(e); - } - } - ); - } - } - ); - } - - @Deprecated - public String getSqlRemoveLogsOlderThan() - { - return StringUtils.format( - "DELETE a FROM %s a INNER JOIN %s b ON a.%s_id = b.id " - + "WHERE b.created_date < :date_time and b.active = false", - logTable, entryTable, entryTypeName - ); - } - @Override public Map getLocks(final String entryId) { diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorSchemaPersistenceTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorSchemaPersistenceTest.java index e103b428fedb..b59af9ef690b 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorSchemaPersistenceTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorSchemaPersistenceTest.java @@ -56,7 +56,6 @@ public void testCreateTables() tables.add(tablesConfig.getSegmentsTable()); tables.add(tablesConfig.getRulesTable()); tables.add(tablesConfig.getLockTable(entryType)); - tables.add(tablesConfig.getLogTable(entryType)); tables.add(tablesConfig.getEntryTable(entryType)); tables.add(tablesConfig.getAuditTable()); tables.add(tablesConfig.getSupervisorTable()); @@ -67,7 +66,6 @@ public void testCreateTables() dropSequence.add(tablesConfig.getSegmentSchemasTable()); dropSequence.add(tablesConfig.getRulesTable()); dropSequence.add(tablesConfig.getLockTable(entryType)); - dropSequence.add(tablesConfig.getLogTable(entryType)); dropSequence.add(tablesConfig.getEntryTable(entryType)); dropSequence.add(tablesConfig.getAuditTable()); dropSequence.add(tablesConfig.getSupervisorTable()); diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java index 484299b5636b..a40f95c31b92 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataConnectorTest.java @@ -75,7 +75,6 @@ public void testCreateTables() tables.add(tablesConfig.getSegmentsTable()); tables.add(tablesConfig.getRulesTable()); tables.add(tablesConfig.getLockTable(entryType)); - tables.add(tablesConfig.getLogTable(entryType)); tables.add(tablesConfig.getEntryTable(entryType)); tables.add(tablesConfig.getAuditTable()); tables.add(tablesConfig.getSupervisorTable()); diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java index 6cddcd5d6468..8117d6dcb735 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataStorageActionHandlerTest.java @@ -34,7 +34,6 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup; import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup; import org.joda.time.DateTime; @@ -73,12 +72,10 @@ public void setUp() TestDerbyConnector connector = derbyConnectorRule.getConnector(); final String entryType = "entry"; - final String logTable = "logs"; final String lockTable = "locks"; connector.prepareTaskEntryTable(entryTable); connector.createLockTable(lockTable, entryType); - connector.createLogTable(logTable, entryType); handler = new DerbyMetadataStorageActionHandler<>( connector, @@ -101,12 +98,6 @@ public TypeReference> getStatusType() }; } - @Override - public TypeReference> getLogType() - { - return JacksonUtils.TYPE_REFERENCE_MAP_STRING_STRING; - } - @Override public TypeReference> getLockType() { @@ -117,7 +108,7 @@ public TypeReference> getLockType() }, entryType, entryTable, - logTable, + null, lockTable ); } @@ -247,37 +238,31 @@ public void testDuplicateInsertThrowsEntryExistsException() } @Test - public void testLogs() + public void testAddLogThrowsUnsupportedException() { - final String entryId = "abcd"; - Map entry = ImmutableMap.of("a", 1); - Map status = ImmutableMap.of("count", 42); - - handler.insert(entryId, DateTimes.of("2014-01-01"), "test", entry, true, status, "type", "group"); - - Assert.assertEquals( - ImmutableList.of(), - handler.getLogs("non_exist_entry") + Exception exception = Assert.assertThrows( + DruidException.class, + () -> handler.addLog("abcd", ImmutableMap.of("logentry", "created")) ); - Assert.assertEquals( - ImmutableMap.of(), - handler.getLocks(entryId) + "Task actions are not logged anymore.", + exception.getMessage() ); + } - final ImmutableMap log1 = ImmutableMap.of("logentry", "created"); - final ImmutableMap log2 = ImmutableMap.of("logentry", "updated"); - - Assert.assertTrue(handler.addLog(entryId, log1)); - Assert.assertTrue(handler.addLog(entryId, log2)); - + @Test + public void testGetLogsThrowsUnsupportedException() + { + Exception exception = Assert.assertThrows( + DruidException.class, + () -> handler.getLogs("abcd") + ); Assert.assertEquals( - ImmutableList.of(log1, log2), - handler.getLogs(entryId) + "Task actions are not logged anymore.", + exception.getMessage() ); } - @Test public void testLocks() { @@ -388,19 +373,16 @@ public void testRemoveTasksOlderThan() Map entry1 = ImmutableMap.of("numericId", 1234); Map status1 = ImmutableMap.of("count", 42, "temp", 1); handler.insert(entryId1, DateTimes.of("2014-01-01T00:00:00.123"), "testDataSource", entry1, false, status1, "type", "group"); - Assert.assertTrue(handler.addLog(entryId1, ImmutableMap.of("logentry", "created"))); final String entryId2 = "ABC123"; Map entry2 = ImmutableMap.of("a", 1); Map status2 = ImmutableMap.of("count", 42); handler.insert(entryId2, DateTimes.of("2014-01-01T00:00:00.123"), "test", entry2, true, status2, "type", "group"); - Assert.assertTrue(handler.addLog(entryId2, ImmutableMap.of("logentry", "created"))); final String entryId3 = "DEF5678"; Map entry3 = ImmutableMap.of("numericId", 5678); Map status3 = ImmutableMap.of("count", 21, "temp", 2); handler.insert(entryId3, DateTimes.of("2014-01-02T12:00:00.123"), "testDataSource", entry3, false, status3, "type", "group"); - Assert.assertTrue(handler.addLog(entryId3, ImmutableMap.of("logentry", "created"))); Assert.assertEquals(Optional.of(entry1), handler.getEntry(entryId1)); Assert.assertEquals(Optional.of(entry2), handler.getEntry(entryId2)); @@ -438,10 +420,6 @@ public void testRemoveTasksOlderThan() .collect(Collectors.toList()) ); - // tasklogs - Assert.assertEquals(0, handler.getLogs(entryId1).size()); - Assert.assertEquals(1, handler.getLogs(entryId2).size()); - Assert.assertEquals(1, handler.getLogs(entryId3).size()); } @Test diff --git a/services/src/main/java/org/apache/druid/cli/CliOverlord.java b/services/src/main/java/org/apache/druid/cli/CliOverlord.java index 24e98427ce95..da4b7e02714e 100644 --- a/services/src/main/java/org/apache/druid/cli/CliOverlord.java +++ b/services/src/main/java/org/apache/druid/cli/CliOverlord.java @@ -58,7 +58,6 @@ import org.apache.druid.indexing.common.actions.LocalTaskActionClientFactory; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.actions.TaskActionToolbox; -import org.apache.druid.indexing.common.actions.TaskAuditLogConfig; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; @@ -209,7 +208,6 @@ public void configure(Binder binder) JsonConfigProvider.bind(binder, "druid.indexer.tasklock", TaskLockConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class); JsonConfigProvider.bind(binder, "druid.indexer.task.default", DefaultTaskConfig.class); - JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class); binder.bind(RetryPolicyFactory.class).in(LazySingleton.class); binder.bind(TaskMaster.class).in(ManageLifecycle.class); diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 1ca8ddf539fc..501e8d69733c 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -78,7 +78,6 @@ import org.apache.druid.indexing.common.actions.RemoteTaskActionClientFactory; import org.apache.druid.indexing.common.actions.TaskActionClientFactory; import org.apache.druid.indexing.common.actions.TaskActionToolbox; -import org.apache.druid.indexing.common.actions.TaskAuditLogConfig; import org.apache.druid.indexing.common.config.TaskConfig; import org.apache.druid.indexing.common.config.TaskStorageConfig; import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory; @@ -482,7 +481,6 @@ static void bindTaskConfigAndClients(Binder binder) binder.bind(TaskToolboxFactory.class).in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.indexer.task", TaskConfig.class); - JsonConfigProvider.bind(binder, "druid.indexer.auditlog", TaskAuditLogConfig.class); JsonConfigProvider.bind(binder, "druid.peon.taskActionClient.retry", RetryPolicyConfig.class); configureTaskActionClient(binder);