Skip to content
Merged
17 changes: 14 additions & 3 deletions core/src/main/java/org/apache/druid/audit/AuditManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.audit;


import org.apache.druid.common.config.ConfigSerde;
import org.joda.time.Interval;
import org.skife.jdbi.v2.Handle;

Expand All @@ -28,15 +29,25 @@

public interface AuditManager
{
/**
* This String is the default message stored instead of the actual audit payload if the audit payload size
* exceeded the maximum size limit configuration
*/
String PAYLOAD_SKIP_MSG_FORMAT = "Payload was not stored as its size exceeds the limit [%d] configured by druid.audit.manager.maxPayloadSizeBytes";

String X_DRUID_AUTHOR = "X-Druid-Author";

String X_DRUID_COMMENT = "X-Druid-Comment";

/**
* inserts an audit Entry in the Audit Table
* @param auditEntry
* inserts an audit entry in the Audit Table
* @param key of the audit entry
* @param type of the audit entry
* @param auditInfo of the audit entry
* @param payload of the audit entry
* @param configSerde of the payload of the audit entry
*/
void doAudit(AuditEntry auditEntry);
<T> void doAudit(String key, String type, AuditInfo auditInfo, T payload, ConfigSerde<T> configSerde);

/**
* inserts an audit Entry in audit table using the handler provided
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@
public interface ConfigSerde<T>
{
byte[] serialize(T obj);
String serializeToString(T obj);
/**
* Serialize object to String
*
* @param obj to be serialize
* @param skipNull if true, then skip serialization of any field with null value.
* This can be used to reduce the size of the resulting String.
* @return String serialization of the input
*/
String serializeToString(T obj, boolean skipNull);
T deserialize(byte[] bytes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import org.apache.druid.audit.AuditEntry;
import org.apache.druid.audit.AuditInfo;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.common.config.ConfigManager.SetResult;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.JsonNonNull;
import org.apache.druid.java.util.common.jackson.JacksonUtils;

import java.io.IOException;
Expand All @@ -38,18 +40,21 @@ public class JacksonConfigManager
{
private final ConfigManager configManager;
private final ObjectMapper jsonMapper;
private final ObjectMapper jsonMapperSkipNull;
private final AuditManager auditManager;

@Inject
public JacksonConfigManager(
ConfigManager configManager,
ObjectMapper jsonMapper,
@Json ObjectMapper jsonMapper,
@JsonNonNull ObjectMapper jsonMapperOnlyNonNullValue,
AuditManager auditManager
)
{
this.configManager = configManager;
this.jsonMapper = jsonMapper;
this.auditManager = auditManager;
this.jsonMapperSkipNull = jsonMapperOnlyNonNullValue;
}

public <T> AtomicReference<T> watch(String key, Class<? extends T> clazz)
Expand All @@ -72,18 +77,12 @@ public <T> SetResult set(String key, T val, AuditInfo auditInfo)
ConfigSerde configSerde = create(val.getClass(), null);
// Audit and actual config change are done in separate transactions
// there can be phantom audits and reOrdering in audit changes as well.
auditManager.doAudit(
AuditEntry.builder()
.key(key)
.type(key)
.auditInfo(auditInfo)
.payload(configSerde.serializeToString(val))
.build()
);
auditManager.doAudit(key, key, auditInfo, val, configSerde);
return configManager.set(key, configSerde, val);
}

private <T> ConfigSerde<T> create(final Class<? extends T> clazz, final T defaultVal)
@VisibleForTesting
<T> ConfigSerde<T> create(final Class<? extends T> clazz, final T defaultVal)
{
return new ConfigSerde<T>()
{
Expand All @@ -99,10 +98,10 @@ public byte[] serialize(T obj)
}

@Override
public String serializeToString(T obj)
public String serializeToString(T obj, boolean skipNull)
{
try {
return jsonMapper.writeValueAsString(obj);
return skipNull ? jsonMapperSkipNull.writeValueAsString(obj) : jsonMapper.writeValueAsString(obj);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
Expand All @@ -121,7 +120,8 @@ public T deserialize(byte[] bytes)
};
}

private <T> ConfigSerde<T> create(final TypeReference<? extends T> clazz, final T defaultVal)
@VisibleForTesting
<T> ConfigSerde<T> create(final TypeReference<? extends T> clazz, final T defaultVal)
{
return new ConfigSerde<T>()
{
Expand All @@ -137,10 +137,10 @@ public byte[] serialize(T obj)
}

@Override
public String serializeToString(T obj)
public String serializeToString(T obj, boolean skipNull)
{
try {
return jsonMapper.writeValueAsString(obj);
return skipNull ? jsonMapperSkipNull.writeValueAsString(obj) : jsonMapper.writeValueAsString(obj);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.inject.Module;
import com.google.inject.Provides;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.JsonNonNull;
import org.apache.druid.guice.annotations.Smile;
import org.skife.config.ConfigurationObjectFactory;

Expand All @@ -41,6 +42,7 @@ public class DruidSecondaryModule implements Module
private final Properties properties;
private final ConfigurationObjectFactory factory;
private final ObjectMapper jsonMapper;
private final ObjectMapper jsonMapperOnlyNonNullValueSerialization;
private final ObjectMapper smileMapper;
private final Validator validator;

Expand All @@ -49,13 +51,15 @@ public DruidSecondaryModule(
Properties properties,
ConfigurationObjectFactory factory,
@Json ObjectMapper jsonMapper,
@JsonNonNull ObjectMapper jsonMapperOnlyNonNullValueSerialization,
@Smile ObjectMapper smileMapper,
Validator validator
)
{
this.properties = properties;
this.factory = factory;
this.jsonMapper = jsonMapper;
this.jsonMapperOnlyNonNullValueSerialization = jsonMapperOnlyNonNullValueSerialization;
this.smileMapper = smileMapper;
this.validator = validator;
}
Expand All @@ -78,6 +82,13 @@ public ObjectMapper getJsonMapper(final Injector injector)
return jsonMapper;
}

@Provides @LazySingleton @JsonNonNull
public ObjectMapper getJsonMapperOnlyNonNullValueSerialization(final Injector injector)
{
setupJackson(injector, jsonMapperOnlyNonNullValueSerialization);
return jsonMapperOnlyNonNullValueSerialization;
}

@Provides @LazySingleton @Smile
public ObjectMapper getSmileMapper(Injector injector)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.guice.annotations;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* The ObjectMapper of this annotation will skip serialization of any field with null value.
*/
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@BindingAnnotation
@PublicApi
public @interface JsonNonNull
{
}
Loading