Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.utils.JvmUtils;

import javax.validation.constraints.Min;
Expand All @@ -45,7 +46,6 @@ public class BaseHttpEmittingConfig
* Do not time out in case flushTimeOut is not set
*/
public static final long DEFAULT_FLUSH_TIME_OUT = Long.MAX_VALUE;
public static final String DEFAULT_BASIC_AUTHENTICATION = null;
public static final BatchingStrategy DEFAULT_BATCHING_STRATEGY = BatchingStrategy.ARRAY;
public static final ContentEncoding DEFAULT_CONTENT_ENCODING = null;
public static final float DEFAULT_HTTP_TIMEOUT_ALLOWANCE_FACTOR = 2.0f;
Expand Down Expand Up @@ -86,7 +86,7 @@ public static Pair<Integer, Integer> getDefaultBatchSizeAndLimit(long maxMemory)
long flushTimeOut = DEFAULT_FLUSH_TIME_OUT;

@JsonProperty
String basicAuthentication = DEFAULT_BASIC_AUTHENTICATION;
PasswordProvider basicAuthentication = null;

@JsonProperty
BatchingStrategy batchingStrategy = DEFAULT_BATCHING_STRATEGY;
Expand Down Expand Up @@ -125,7 +125,7 @@ public long getFlushTimeOut()
return flushTimeOut;
}

public String getBasicAuthentication()
public PasswordProvider getBasicAuthentication()
{
return basicAuthentication;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.java.util.emitter.core;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.metadata.PasswordProvider;

import javax.validation.constraints.NotNull;

Expand Down Expand Up @@ -86,7 +87,7 @@ public Builder setFlushCount(int flushCount)
return this;
}

public Builder setBasicAuthentication(String basicAuthentication)
public Builder setBasicAuthentication(PasswordProvider basicAuthentication)
{
this.basicAuthentication = basicAuthentication;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ private void send(byte[] buffer, int length) throws Exception
request.setBody(ByteBuffer.wrap(payload, 0, payloadLength));

if (config.getBasicAuthentication() != null) {
final String[] parts = config.getBasicAuthentication().split(":", 2);
final String[] parts = config.getBasicAuthentication().getPassword().split(":", 2);
final String user = parts[0];
final String password = parts.length > 1 ? parts[1] : "";
String encoded = StringUtils.encodeBase64String((user + ':' + password).getBytes(StandardCharsets.UTF_8));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.emitter.service.UnitEvent;
import org.apache.druid.metadata.DefaultPasswordProvider;
import org.apache.druid.metadata.PasswordProvider;
import org.apache.druid.utils.CompressionUtils;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
Expand Down Expand Up @@ -175,7 +177,7 @@ private HttpPostEmitter sizeBasedEmitterWithContentEncoding(int size, ContentEnc
return emitter;
}

private HttpPostEmitter manualFlushEmitterWithBasicAuthenticationAndNewlineSeparating(String authentication)
private HttpPostEmitter manualFlushEmitterWithBasicAuthenticationAndNewlineSeparating(PasswordProvider authentication)
{
HttpEmitterConfig config = new HttpEmitterConfig.Builder(TARGET_URL)
.setFlushMillis(Long.MAX_VALUE)
Expand Down Expand Up @@ -439,7 +441,7 @@ public void testBasicAuthenticationAndNewlineSeparating() throws Exception
new UnitEvent("test", 1),
new UnitEvent("test", 2)
);
emitter = manualFlushEmitterWithBasicAuthenticationAndNewlineSeparating("foo:bar");
emitter = manualFlushEmitterWithBasicAuthenticationAndNewlineSeparating(new DefaultPasswordProvider("foo:bar"));

httpClient.setGoHandler(
new GoHandler()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testDefaults()
Assert.assertEquals(60000, config.getFlushMillis());
Assert.assertEquals(500, config.getFlushCount());
Assert.assertEquals("http://example.com/", config.getRecipientBaseUrl());
Assert.assertEquals(null, config.getBasicAuthentication());
Assert.assertNull(config.getBasicAuthentication());
Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy());
Pair<Integer, Integer> batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
Runtime.getRuntime().maxMemory()
Expand All @@ -67,7 +67,7 @@ public void testDefaultsLegacy()
Assert.assertEquals(60000, config.getFlushMillis());
Assert.assertEquals(300, config.getFlushCount());
Assert.assertEquals("http://example.com/", config.getRecipientBaseUrl());
Assert.assertEquals(null, config.getBasicAuthentication());
Assert.assertNull(config.getBasicAuthentication());
Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy());
Pair<Integer, Integer> batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
Runtime.getRuntime().maxMemory()
Expand Down Expand Up @@ -103,7 +103,7 @@ public void testSettingEverything()
Assert.assertEquals(1, config.getFlushMillis());
Assert.assertEquals(2, config.getFlushCount());
Assert.assertEquals("http://example.com/", config.getRecipientBaseUrl());
Assert.assertEquals("a:b", config.getBasicAuthentication());
Assert.assertEquals("a:b", config.getBasicAuthentication().getPassword());
Assert.assertEquals(BatchingStrategy.NEWLINES, config.getBatchingStrategy());
Assert.assertEquals(4, config.getMaxBatchSize());
Assert.assertEquals(1000, config.getFlushTimeOut());
Expand Down Expand Up @@ -133,7 +133,7 @@ public void testSettingEverythingLegacy()
Assert.assertEquals(1, config.getFlushMillis());
Assert.assertEquals(2, config.getFlushCount());
Assert.assertEquals("http://example.com/", config.getRecipientBaseUrl());
Assert.assertEquals("a:b", config.getBasicAuthentication());
Assert.assertEquals("a:b", config.getBasicAuthentication().getPassword());
Assert.assertEquals(BatchingStrategy.NEWLINES, config.getBatchingStrategy());
Assert.assertEquals(4, config.getMaxBatchSize());
Assert.assertEquals(1000, config.getFlushTimeOut());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void testDefaults()
Assert.assertEquals(60000, config.getFlushMillis());
Assert.assertEquals(500, config.getFlushCount());
Assert.assertEquals("http://example.com/topic", config.getRecipientBaseUrl());
Assert.assertEquals(null, config.getBasicAuthentication());
Assert.assertNull(config.getBasicAuthentication());
Assert.assertEquals(BatchingStrategy.ARRAY, config.getBatchingStrategy());
Pair<Integer, Integer> batchConfigPair = BaseHttpEmittingConfig.getDefaultBatchSizeAndLimit(
Runtime.getRuntime().maxMemory()
Expand Down Expand Up @@ -68,7 +68,7 @@ public void testSettingEverything()
Assert.assertEquals(1, config.getFlushMillis());
Assert.assertEquals(2, config.getFlushCount());
Assert.assertEquals("http://example.com/topic", config.getRecipientBaseUrl());
Assert.assertEquals("a:b", config.getBasicAuthentication());
Assert.assertEquals("a:b", config.getBasicAuthentication().getPassword());
Assert.assertEquals(BatchingStrategy.NEWLINES, config.getBatchingStrategy());
Assert.assertEquals(4, config.getMaxBatchSize());
Assert.assertEquals(1000, config.getFlushTimeOut());
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ The Druid servers [emit various metrics](../operations/metrics.md) and alerts vi
|--------|-----------|-------|
|`druid.emitter.http.flushMillis`|How often the internal message buffer is flushed (data is sent).|60000|
|`druid.emitter.http.flushCount`|How many messages the internal message buffer can hold before flushing (sending).|500|
|`druid.emitter.http.basicAuthentication`|Login and password for authentication in "login:password" form, e.g., `druid.emitter.http.basicAuthentication=admin:adminpassword`|not specified = no authentication|
|`druid.emitter.http.basicAuthentication`|[Password Provider](../operations/password-provider.md) for providing Login and password for authentication in "login:password" form, e.g., `druid.emitter.http.basicAuthentication=admin:adminpassword` uses Default Password Provider which allows plain text passwords.|not specified = no authentication|
|`druid.emitter.http.flushTimeOut`|The timeout after which an event should be sent to the endpoint, even if internal buffers are not filled, in milliseconds.|not specified = no timeout|
|`druid.emitter.http.batchingStrategy`|The strategy of how the batch is formatted. "ARRAY" means `[event1,event2]`, "NEWLINES" means `event1\nevent2`, ONLY_EVENTS means `event1event2`.|ARRAY|
|`druid.emitter.http.maxBatchSize`|The maximum batch size, in bytes.|the minimum of (10% of JVM heap size divided by 2) or (5191680 (i. e. 5 MB))|
Expand Down