Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/druid/audit/AuditManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,12 @@ public interface AuditManager
* @return list of AuditEntries satisfying the passed parameters
*/
List<AuditEntry> fetchAuditHistory(String type, int limit);

/**
* Remove audit logs created older than the given timestamp.
*
* @param timestamp timestamp in milliseconds
* @return number of audit logs removed
*/
int removeAuditLogsOlderThan(long timestamp);
}
9 changes: 9 additions & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,15 @@ These Coordinator static configurations can be defined in the `coordinator/runti
|`druid.coordinator.asOverlord.enabled`|Boolean value for whether this Coordinator process should act like an Overlord as well. This configuration allows users to simplify a druid cluster by not having to deploy any standalone Overlord processes. If set to true, then Overlord console is available at `http://coordinator-host:port/console.html` and be sure to set `druid.coordinator.asOverlord.overlordService` also. See next.|false|
|`druid.coordinator.asOverlord.overlordService`| Required, if `druid.coordinator.asOverlord.enabled` is `true`. This must be same value as `druid.service` on standalone Overlord processes and `druid.selectors.indexing.serviceName` on Middle Managers.|NULL|

##### Metadata Management

|Property|Description|Required?|Default|
|--------|-----------|---------|-------|
|`druid.coordinator.period.metadataStoreManagementPeriod`|How often to run metadata management tasks in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. |No | `PT1H`|
|`druid.coordinator.kill.audit.on`| Boolean value for whether to enable automatic deletion of audit logs. If set to true, Coordinator will periodically remove audit logs from the audit table entries in metadata storage.| No | False|
|`druid.coordinator.kill.audit.period`| How often to do automatic deletion of audit logs in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Value must be greater than `druid.coordinator.period.metadataStoreManagementPeriod`. Only applies if `druid.coordinator.kill.audit.on` is set to True.| No| `P1D`|
|`druid.coordinator.kill.audit.durationToRetain`| Duration of audit logs to be retained from created time in [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601) duration format. Only applies if `druid.coordinator.kill.audit.on` is set to True.| Yes if `druid.coordinator.kill.audit.on` is set to True| None|

##### Segment Management
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
Expand Down
2 changes: 2 additions & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ These metrics are for the Druid Coordinator and are reset each time the Coordina
|`interval/skipCompact/count`|Total number of intervals of this datasource that are skipped (not eligible for auto compaction) by the auto compaction.|datasource.|Varies.|
|`coordinator/time`|Approximate Coordinator duty runtime in milliseconds. The duty dimension is the string alias of the Duty that is being run.|duty.|Varies.|
|`coordinator/global/time`|Approximate runtime of a full coordination cycle in milliseconds. The `dutyGroup` dimension indicates what type of coordination this run was. i.e. Historical Management vs Indexing|`dutyGroup`|Varies.|
|`metadata/kill/audit/count`|Total number of audit logs automatically deleted from metadata store audit table per each Coordinator kill audit duty run. This metric can help adjust `druid.coordinator.kill.audit.durationToRetain` configuration based on if more or less audit logs need to be deleted per cycle. Note that this metric is only emitted when `druid.coordinator.kill.audit.on` is set to true.| |Varies.|


If `emitBalancingStats` is set to `true` in the Coordinator [dynamic configuration](../configuration/index.md#dynamic-configuration), then [log entries](../configuration/logging.md) for class
`org.apache.druid.server.coordinator.duty.EmitClusterStatsAndMetrics` will have extra information on balancing
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.guice.annotations;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
*/
@BindingAnnotation
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface CoordinatorMetadataStoreManagementDuty
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.Query;
import org.skife.jdbi.v2.Update;
import org.skife.jdbi.v2.tweak.HandleCallback;

import java.io.IOException;
Expand Down Expand Up @@ -229,6 +230,24 @@ public List<AuditEntry> fetchAuditHistory(final String type, int limit)
return fetchAuditHistoryLastEntries(null, type, limit);
}

@Override
public int removeAuditLogsOlderThan(final long timestamp)
{
DateTime dateTime = DateTimes.utc(timestamp);
return dbi.withHandle(
handle -> {
Update sql = handle.createStatement(
StringUtils.format(
"DELETE FROM %s WHERE created_date < :date_time",
getAuditTable()
)
);
return sql.bind("date_time", dateTime.toString())
.execute();
}
);
}

private List<AuditEntry> fetchAuditHistoryLastEntries(final String key, final String type, int limit)
throws IllegalArgumentException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.druid.discovery.DruidLeaderSelector;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.annotations.CoordinatorIndexingServiceDuty;
import org.apache.druid.guice.annotations.CoordinatorMetadataStoreManagementDuty;
import org.apache.druid.guice.annotations.Self;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
Expand Down Expand Up @@ -150,6 +151,7 @@ public class DruidCoordinator
private final ServiceAnnouncer serviceAnnouncer;
private final DruidNode self;
private final Set<CoordinatorDuty> indexingServiceDuties;
private final Set<CoordinatorDuty> metadataStoreManagementDuties;
private final BalancerStrategyFactory factory;
private final LookupCoordinatorManager lookupCoordinatorManager;
private final DruidLeaderSelector coordLeaderSelector;
Expand All @@ -164,6 +166,7 @@ public class DruidCoordinator
private ListeningExecutorService balancerExec;

private static final String HISTORICAL_MANAGEMENT_DUTIES_DUTY_GROUP = "HistoricalManagementDuties";
private static final String METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP = "MetadataStoreManagementDuties";
private static final String INDEXING_SERVICE_DUTIES_DUTY_GROUP = "IndexingServiceDuties";
private static final String COMPACT_SEGMENTS_DUTIES_DUTY_GROUP = "CompactSegmentsDuties";

Expand All @@ -182,6 +185,7 @@ public DruidCoordinator(
LoadQueueTaskMaster taskMaster,
ServiceAnnouncer serviceAnnouncer,
@Self DruidNode self,
@CoordinatorMetadataStoreManagementDuty Set<CoordinatorDuty> metadataStoreManagementDuties,
@CoordinatorIndexingServiceDuty Set<CoordinatorDuty> indexingServiceDuties,
BalancerStrategyFactory factory,
LookupCoordinatorManager lookupCoordinatorManager,
Expand All @@ -206,6 +210,7 @@ public DruidCoordinator(
self,
new ConcurrentHashMap<>(),
indexingServiceDuties,
metadataStoreManagementDuties,
factory,
lookupCoordinatorManager,
coordLeaderSelector,
Expand All @@ -230,6 +235,7 @@ public DruidCoordinator(
DruidNode self,
ConcurrentMap<String, LoadQueuePeon> loadQueuePeonMap,
Set<CoordinatorDuty> indexingServiceDuties,
Set<CoordinatorDuty> metadataStoreManagementDuties,
BalancerStrategyFactory factory,
LookupCoordinatorManager lookupCoordinatorManager,
DruidLeaderSelector coordLeaderSelector,
Expand All @@ -255,6 +261,7 @@ public DruidCoordinator(
this.serviceAnnouncer = serviceAnnouncer;
this.self = self;
this.indexingServiceDuties = indexingServiceDuties;
this.metadataStoreManagementDuties = metadataStoreManagementDuties;

this.exec = scheduledExecutorFactory.create(1, "Coordinator-Exec--%d");

Expand Down Expand Up @@ -665,6 +672,12 @@ private void becomeLeader()
)
);
}
dutiesRunnables.add(
Pair.of(
new DutiesRunnable(makeMetadataStoreManagementDuties(), startingLeaderCounter, METADATA_STORE_MANAGEMENT_DUTIES_DUTY_GROUP),
config.getCoordinatorMetadataStoreManagementPeriod()
)
);

for (final Pair<? extends DutiesRunnable, Duration> dutiesRunnable : dutiesRunnables) {
// CompactSegmentsDuty can takes a non trival amount of time to complete.
Expand Down Expand Up @@ -750,6 +763,19 @@ private List<CoordinatorDuty> makeIndexingServiceDuties()
return ImmutableList.copyOf(duties);
}

private List<CoordinatorDuty> makeMetadataStoreManagementDuties()
{
List<CoordinatorDuty> duties = ImmutableList.<CoordinatorDuty>builder()
.addAll(metadataStoreManagementDuties)
.build();

log.debug(
"Done making metadata store management duties %s",
duties.stream().map(duty -> duty.getClass().getName()).collect(Collectors.toList())
);
return ImmutableList.copyOf(duties);
}

private List<CoordinatorDuty> makeCompactSegmentsDuty()
{
return ImmutableList.of(compactSegments);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ public abstract class DruidCoordinatorConfig
@Default("PT1800s")
public abstract Duration getCoordinatorIndexingPeriod();

@Config("druid.coordinator.period.metadataStoreManagementPeriod")
@Default("PT1H")
public abstract Duration getCoordinatorMetadataStoreManagementPeriod();

@Config("druid.coordinator.kill.period")
@Default("P1D")
public abstract Duration getCoordinatorKillPeriod();
Expand All @@ -51,6 +55,14 @@ public abstract class DruidCoordinatorConfig
@Default("0")
public abstract int getCoordinatorKillMaxSegments();

@Config("druid.coordinator.kill.audit.period")
@Default("P1D")
public abstract Duration getCoordinatorAuditKillPeriod();

@Config("druid.coordinator.kill.audit.durationToRetain")
@Default("PT-1s")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These defaults don't seem to match the documented defaults above

Copy link
Copy Markdown
Contributor Author

@maytasm maytasm Apr 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the docs.
druid.coordinator.kill.audit.period is not required. And the default value is P1D
druid.coordinator.kill.audit.durationToRetain is required. The default value which is -1 which causes the precondition check to fail. User should set it to a positive value. Instead of putting in the docs that the default value is -1 and that Coordinator will fail if value is negative, for simplicity, I just documented that default value is None and user must set this value.

public abstract Duration getCoordinatorAuditKillDurationToRetain();

@Config("druid.coordinator.load.timeout")
public Duration getLoadTimeoutDelay()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.server.coordinator.duty;

import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.coordinator.DruidCoordinatorConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;

public class KillAuditLog implements CoordinatorDuty
{
private static final Logger log = new Logger(KillAuditLog.class);

private final long period;
private final long retainDuration;
private long lastKillTime = 0;

private final AuditManager auditManager;

@Inject
public KillAuditLog(
AuditManager auditManager,
DruidCoordinatorConfig config
)
{
this.period = config.getCoordinatorAuditKillPeriod().getMillis();
Preconditions.checkArgument(
this.period >= config.getCoordinatorMetadataStoreManagementPeriod().getMillis(),
"coordinator audit kill period must be >= druid.coordinator.period.metadataStoreManagementPeriod"
);
this.retainDuration = config.getCoordinatorAuditKillDurationToRetain().getMillis();
Preconditions.checkArgument(this.retainDuration >= 0, "coordinator audit kill retainDuration must be >= 0");
log.debug(
"Audit Kill Task scheduling enabled with period [%s], retainDuration [%s]",
this.period,
this.retainDuration
);
this.auditManager = auditManager;
}

@Override
public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
{
if ((lastKillTime + period) < System.currentTimeMillis()) {
lastKillTime = System.currentTimeMillis();

long timestamp = System.currentTimeMillis() - retainDuration;
int auditRemoved = auditManager.removeAuditLogsOlderThan(timestamp);
ServiceEmitter emitter = params.getEmitter();
emitter.emit(
new ServiceMetricEvent.Builder().build(
"metadata/kill/audit/count",
auditRemoved
)
);
log.info("Finished running KillAuditLog duty. Removed %,d audit logs", auditRemoved);
}
return params;
}
}
Loading