Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion api/src/main/java/io/druid/guice/JsonConfigurator.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import javax.validation.Validator;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -93,7 +94,7 @@ public <T> T configurate(Properties props, String propertyPrefix, Class<T> clazz
value = propValue;
}

jsonMap.put(prop.substring(propertyBase.length()), value);
hieraricalPutValue(propertyPrefix, prop, prop.substring(propertyBase.length()), value, jsonMap);
}
}

Expand Down Expand Up @@ -165,6 +166,43 @@ public Message apply(String input)
return config;
}

private static void hieraricalPutValue(
String propertyPrefix,
String originalProperty,
String property,
Object value,
Map<String, Object> targetMap
)
{
int dotIndex = property.indexOf('.');
if (dotIndex < 0) {
targetMap.put(property, value);
return;
}
if (dotIndex == 0) {
throw new ProvisionException(StringUtils.format("Double dot in property: %s", originalProperty));
}
if (dotIndex == property.length() - 1) {
throw new ProvisionException(StringUtils.format("Dot at the end of property: %s", originalProperty));
}
String nestedKey = property.substring(0, dotIndex);
Object nested = targetMap.computeIfAbsent(nestedKey, k -> new HashMap<String, Object>());
if (!(nested instanceof Map)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that targetMap passed is a method local object, why can't we simple do it like...

if (targetMap.containsKey(nestedKey)) {
..
} else {
  ..
}

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value may be a Map or a String, !(nested instanceof Map) checks that the nested map couldn't be populated

// Clash is possible between properties, which are used to configure different objects: e. g.
// druid.emitter=parametrized is used to configure Emitter class, and druid.emitter.parametrized.xxx=yyy is used
// to configure ParametrizedUriEmitterConfig object. So skipping xxx=yyy key-value pair when configuring Emitter
// doesn't make any difference. That is why we just log this situation, instead of throwing an exception.
log.info(
"Skipping %s property: one of it's prefixes is also used as a property key. Prefix: %s",
originalProperty,
propertyPrefix
);
return;
}
Map<String, Object> nestedMap = (Map<String, Object>) nested;
hieraricalPutValue(propertyPrefix, originalProperty, property.substring(dotIndex + 1), value, nestedMap);
}

@VisibleForTesting
public static <T> void verifyClazzIsConfigurable(ObjectMapper mapper, Class<T> clazz)
{
Expand Down
22 changes: 19 additions & 3 deletions docs/content/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ The Druid servers [emit various metrics](../operations/metrics.html) and alerts

|Property|Description|Default|
|--------|-----------|-------|
|`druid.emitter`|Setting this value to "noop", "logging", or "http" will initialize one of the emitter modules. value "composing" can be used to initialize multiple emitter modules. |noop|
|`druid.emitter`|Setting this value to "noop", "logging", "http" or "parametrized" will initialize one of the emitter modules. value "composing" can be used to initialize multiple emitter modules. |noop|

#### Logging Emitter Module

Expand All @@ -216,10 +216,26 @@ The Druid servers [emit various metrics](../operations/metrics.html) and alerts

|Property|Description|Default|
|--------|-----------|-------|
|`druid.emitter.http.timeOut`|The timeout for data reads.|PT5M|
|`druid.emitter.http.readTimeout`|The timeout for data reads.|PT5M|
|`druid.emitter.http.flushMillis`|How often the internal message buffer is flushed (data is sent).|60000|
|`druid.emitter.http.flushCount`|How many messages the internal message buffer can hold before flushing (sending).|500|
|`druid.emitter.http.recipientBaseUrl`|The base URL to emit messages to. Druid will POST JSON to be consumed at the HTTP endpoint specified by this property.|none|
|`druid.emitter.http.basicAuthentication`|Login and password for authentification in "login:password" form, e. g. `druid.emitter.http.basicAuthentication=admin:adminpassword`|not specified = no authentification|
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wish list: this to be a PasswordProvider

|`druid.emitter.http.flushTimeOut|The timeout after which an event should be sent to the endpoint, even if internal buffers are not filled, in milliseconds.|not specified = no timeout|
|`druid.emitter.http.batchingStrategy`|The strategy of how the batch is formatted. "ARRAY" means `[event1,event2]`, "NEWLINES" means `event1\nevent2`, ONLY_EVENTS means `event1event2`.|ARRAY|
|`druid.emitter.http.maxBatchSize`|The maximum batch size, in bytes.|5191680 (i. e. 5 MB)|
|`druid.emitter.http.recipientBaseUrl`|The base URL to emit messages to. Druid will POST JSON to be consumed at the HTTP endpoint specified by this property.|none, required config|

#### Parametrized Http Emitter Module

`druid.emitter.parametrized.httpEmitting.*` configs correspond to the configs of Http Emitter Modules, see above.
Except `readTimeout` and `recipientBaseUrl`. E. g. `druid.emitter.parametrized.httpEmitting.flushMillis`,
`druid.emitter.parametrized.httpEmitting.flushCount`, etc.

The additional configs are:
|Property|Description|Default|
|--------|-----------|-------|
|`druid.emitter.parametrized.readTimeout`|The timeout for data reads.|PT5M|
|`druid.emitter.parametrized.recipientBaseUrlPattern`|The URL pattern to send an event to, based on the event's feed. E. g. `http://foo.bar/{feed}`, that will send event to `http://foo.bar/metrics` if the event's feed is "metrics".|none, required config|

#### Composing Emitter Module

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@
<dependency>
<groupId>com.metamx</groupId>
<artifactId>emitter</artifactId>
<version>0.4.5</version>
<version>0.6.0</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.metadata.storage.derby.DerbyMetadataStorageDruidModule;
import io.druid.server.initialization.EmitterModule;
import io.druid.server.emitter.EmitterModule;
import io.druid.server.initialization.jetty.JettyServerModule;
import io.druid.server.metrics.MetricsModule;
import org.apache.commons.io.FileUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.server.initialization;
package io.druid.server.emitter;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.server.initialization;
package io.druid.server.emitter;

import com.fasterxml.jackson.databind.Module;
import com.google.common.base.Function;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.server.initialization;
package io.druid.server.emitter;

import com.google.common.base.Strings;
import com.google.common.base.Supplier;
Expand Down Expand Up @@ -72,6 +72,7 @@ public void configure(Binder binder)
binder.install(new NoopEmitterModule());
binder.install(new LogEmitterModule());
binder.install(new HttpEmitterModule());
binder.install(new ParametrizedUriEmitterModule());
binder.install(new ComposingEmitterModule());

binder.bind(Emitter.class).toProvider(new EmitterProvider(emitterType)).in(LazySingleton.class);
Expand All @@ -87,6 +88,7 @@ public ServiceEmitter getServiceEmitter(@Self Supplier<DruidNode> configSupplier
"version",
Strings.nullToEmpty(version) // Version is null during `mvn test`.
);
log.info("Underlying emitter for ServiceEmitter: %s", emitter);
final ServiceEmitter retVal = new ServiceEmitter(
config.getServiceName(),
config.getHostAndPortToUse(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.server.initialization;
package io.druid.server.emitter;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;
Expand All @@ -27,10 +27,10 @@
public class HttpEmitterConfig extends com.metamx.emitter.core.HttpEmitterConfig
{
@JsonProperty
private Period timeOut = new Period("PT5M");
private Period readTimeout = new Period("PT5M");

public Period getReadTimeout()
{
return timeOut;
return readTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.server.initialization;
package io.druid.server.emitter;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
Expand Down Expand Up @@ -49,6 +49,11 @@ public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.emitter.http", HttpEmitterConfig.class);

configureSsl(binder);
}

static void configureSsl(Binder binder)
{
final SSLContext context;
try {
context = SSLContext.getDefault();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.server.initialization;
package io.druid.server.emitter;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package io.druid.server.initialization;
package io.druid.server.emitter;

import com.google.inject.Binder;
import com.google.inject.Module;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server.emitter;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.Period;

public class ParametrizedUriEmitterConfig extends com.metamx.emitter.core.ParametrizedUriEmitterConfig
{
@JsonProperty
private Period readTimeout = new Period("PT5M");

public Period getReadTimeout()
{
return readTimeout;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.server.emitter;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Supplier;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.name.Named;
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.core.ParametrizedUriEmitter;
import com.metamx.http.client.HttpClientConfig;
import com.metamx.http.client.HttpClientInit;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.ManageLifecycle;
import io.druid.guice.http.LifecycleUtils;
import io.druid.java.util.common.lifecycle.Lifecycle;

import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;

public class ParametrizedUriEmitterModule implements Module
{
@Override
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.emitter.parametrized", ParametrizedUriEmitterConfig.class);
HttpEmitterModule.configureSsl(binder);
}

@Provides
@ManageLifecycle
@Named("parametrized")
public Emitter getEmitter(
Supplier<ParametrizedUriEmitterConfig> config,
@Nullable SSLContext sslContext,
Lifecycle lifecycle,
ObjectMapper jsonMapper
)
{
final HttpClientConfig.Builder builder = HttpClientConfig
.builder()
.withNumConnections(1)
.withReadTimeout(config.get().getReadTimeout().toStandardDuration());
if (sslContext != null) {
builder.withSslContext(sslContext);
}
return new ParametrizedUriEmitter(
config.get(),
HttpClientInit.createClient(builder.build(), LifecycleUtils.asMmxLifecycle(lifecycle)),
jsonMapper
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.java.util.common.logger.Logger;
import io.druid.server.RequestLogLine;

import javax.validation.constraints.NotNull;
Expand All @@ -35,6 +36,8 @@
@JsonTypeName("composing")
public class ComposingRequestLoggerProvider implements RequestLoggerProvider
{
private static final Logger log = new Logger(ComposingRequestLoggerProvider.class);

@JsonProperty
@NotNull
private final List<RequestLoggerProvider> loggerProviders = Lists.newArrayList();
Expand All @@ -46,7 +49,9 @@ public RequestLogger get()
for (RequestLoggerProvider loggerProvider : loggerProviders) {
loggers.add(loggerProvider.get());
}
return new ComposingRequestLogger(loggers);
ComposingRequestLogger logger = new ComposingRequestLogger(loggers);
log.debug(new Exception("Stack trace"), "Creating %s at", logger);
return logger;
}

public static class ComposingRequestLogger implements RequestLogger
Expand Down Expand Up @@ -79,6 +84,14 @@ public void log(RequestLogLine requestLogLine) throws IOException
throw Throwables.propagate(exception);
}
}

@Override
public String toString()
{
return "ComposingRequestLogger{" +
"loggers=" + loggers +
'}';
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ public void log(final RequestLogLine requestLogLine) throws IOException
emitter.emit(new RequestLogEventBuilder(feed, requestLogLine));
}

@Override
public String toString()
{
return "EmittingRequestLogger{" +
"emitter=" + emitter +
", feed='" + feed + '\'' +
'}';
}

private static class RequestLogEvent implements Event
{
final ImmutableMap<String, String> serviceDimensions;
Expand Down
Loading