Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;

import com.google.api.client.http.ByteArrayContent;
import com.google.api.client.http.GZipEncoding;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler;
import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler.BackOffRequired;
Expand Down Expand Up @@ -104,6 +105,8 @@ static Builder newBuilder() {

abstract Boolean disableCertificateValidation();

abstract Boolean enableGzipHttpCompression();

/**
* Executes a POST for the list of {@link SplunkEvent} objects into Splunk's Http Event Collector
* endpoint.
Expand All @@ -116,6 +119,10 @@ HttpResponse execute(List<SplunkEvent> events) throws IOException {
HttpContent content = getContent(events);
HttpRequest request = requestFactory().buildPostRequest(genericUrl(), content);

if (enableGzipHttpCompression()) {
request.setEncoding(new GZipEncoding());
}

HttpBackOffUnsuccessfulResponseHandler responseHandler =
new HttpBackOffUnsuccessfulResponseHandler(getConfiguredBackOff());

Expand Down Expand Up @@ -208,6 +215,8 @@ abstract static class Builder {

abstract Boolean disableCertificateValidation();

abstract Builder setEnableGzipHttpCompression(Boolean enableGzipHttpCompression);

abstract Builder setMaxElapsedMillis(Integer maxElapsedMillis);

abstract Integer maxElapsedMillis();
Expand Down Expand Up @@ -250,6 +259,19 @@ Builder withDisableCertificateValidation(Boolean disableCertificateValidation) {
return setDisableCertificateValidation(disableCertificateValidation);
}

/**
* Method to specify if HTTP requests sent to Splunk HEC should be GZIP encoded.
*
* @param enableGzipHttpCompression whether to enable Gzip encoding.
* @return {@link Builder}
*/
public Builder withEnableGzipHttpCompression(Boolean enableGzipHttpCompression) {
checkNotNull(
enableGzipHttpCompression,
"withEnableGzipHttpCompression(enableGzipHttpCompression) called with null input.");
return setEnableGzipHttpCompression(enableGzipHttpCompression);
}

/**
* Method to max timeout for {@link ExponentialBackOff}. Otherwise uses the default setting for
* {@link ExponentialBackOff}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.util.List;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.state.BagState;
Expand Down Expand Up @@ -64,6 +66,8 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer, SplunkEvent>, SplunkWr

private static final Integer DEFAULT_BATCH_COUNT = 1;
private static final Boolean DEFAULT_DISABLE_CERTIFICATE_VALIDATION = false;
private static final Boolean DEFAULT_ENABLE_BATCH_LOGS = true;
private static final Boolean DEFAULT_ENABLE_GZIP_HTTP_COMPRESSION = true;
private static final Logger LOG = LoggerFactory.getLogger(SplunkEventWriter.class);
private static final long DEFAULT_FLUSH_DELAY = 2;
private static final Counter INPUT_COUNTER =
Expand All @@ -72,6 +76,18 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer, SplunkEvent>, SplunkWr
Metrics.counter(SplunkEventWriter.class, "outbound-successful-events");
private static final Counter FAILED_WRITES =
Metrics.counter(SplunkEventWriter.class, "outbound-failed-events");
private static final Counter INVALID_REQUESTS =
Metrics.counter(SplunkEventWriter.class, "http-invalid-requests");
private static final Counter SERVER_ERROR_REQUESTS =
Metrics.counter(SplunkEventWriter.class, "http-server-error-requests");
private static final Counter VALID_REQUESTS =
Metrics.counter(SplunkEventWriter.class, "http-valid-requests");
private static final Distribution SUCCESSFUL_WRITE_LATENCY_MS =
Metrics.distribution(SplunkEventWriter.class, "successful_write_to_splunk_latency_ms");
private static final Distribution UNSUCCESSFUL_WRITE_LATENCY_MS =
Metrics.distribution(SplunkEventWriter.class, "unsuccessful_write_to_splunk_latency_ms");
private static final Distribution SUCCESSFUL_WRITE_BATCH_SIZE =
Metrics.distribution(SplunkEventWriter.class, "write_to_splunk_batch");
private static final String BUFFER_STATE_NAME = "buffer";
private static final String COUNT_STATE_NAME = "count";
private static final String TIME_ID_NAME = "expiry";
Expand All @@ -88,6 +104,8 @@ abstract class SplunkEventWriter extends DoFn<KV<Integer, SplunkEvent>, SplunkWr
private Integer batchCount;
private Boolean disableValidation;
private HttpEventPublisher publisher;
private Boolean enableBatchLogs;
private Boolean enableGzipHttpCompression;

private static final Gson GSON =
new GsonBuilder().setFieldNamingStrategy(f -> f.getName().toLowerCase()).create();
Expand All @@ -105,6 +123,10 @@ static Builder newBuilder() {

abstract @Nullable ValueProvider<Integer> inputBatchCount();

abstract @Nullable ValueProvider<Boolean> enableBatchLogs();

abstract @Nullable ValueProvider<Boolean> enableGzipHttpCompression();

@Setup
public void setup() {

Expand Down Expand Up @@ -134,12 +156,36 @@ public void setup() {
LOG.info("Disable certificate validation set to: {}", disableValidation);
}

// Either user supplied or default enableBatchLogs.
if (enableBatchLogs == null) {

if (enableBatchLogs() != null) {
enableBatchLogs = enableBatchLogs().get();
}

enableBatchLogs = MoreObjects.firstNonNull(enableBatchLogs, DEFAULT_ENABLE_BATCH_LOGS);
LOG.info("Enable Batch logs set to: {}", enableBatchLogs);
}

// Either user supplied or default enableGzipHttpCompression.
if (enableGzipHttpCompression == null) {

if (enableGzipHttpCompression() != null) {
enableGzipHttpCompression = enableGzipHttpCompression().get();
}

enableGzipHttpCompression =
MoreObjects.firstNonNull(enableGzipHttpCompression, DEFAULT_ENABLE_GZIP_HTTP_COMPRESSION);
LOG.info("Enable gzip http compression set to: {}", enableGzipHttpCompression);
}

try {
HttpEventPublisher.Builder builder =
HttpEventPublisher.newBuilder()
.withUrl(url().get())
.withToken(token().get())
.withDisableCertificateValidation(disableValidation);
.withDisableCertificateValidation(disableValidation)
.withEnableGzipHttpCompression(enableGzipHttpCompression);

publisher = builder.build();
LOG.info("Successfully created HttpEventPublisher");
Expand Down Expand Up @@ -172,8 +218,9 @@ public void processElement(
timer.offset(Duration.standardSeconds(DEFAULT_FLUSH_DELAY)).setRelative();

if (count >= batchCount) {

LOG.info("Flushing batch of {} events", count);
if (enableBatchLogs) {
LOG.info("Flushing batch of {} events", count);
}
flush(receiver, bufferState, countState);
}
}
Expand All @@ -186,7 +233,9 @@ public void onExpiry(
throws IOException {

if (MoreObjects.<Long>firstNonNull(countState.read(), 0L) > 0) {
LOG.info("Flushing window with {} events", countState.read());
if (enableBatchLogs) {
LOG.info("Flushing window with {} events", countState.read());
}
flush(receiver, bufferState, countState);
}
}
Expand Down Expand Up @@ -219,18 +268,38 @@ private void flush(

HttpResponse response = null;
List<SplunkEvent> events = Lists.newArrayList(bufferState.read());
long startTime = System.nanoTime();
try {
// Important to close this response to avoid connection leak.
response = publisher.execute(events);

if (!response.isSuccessStatusCode()) {
UNSUCCESSFUL_WRITE_LATENCY_MS.update(System.nanoTime() - startTime);
FAILED_WRITES.inc(countState.read());
int statusCode = response.getStatusCode();
if (statusCode >= 400 && statusCode < 500) {
INVALID_REQUESTS.inc();
} else if (statusCode >= 500 && statusCode < 600) {
SERVER_ERROR_REQUESTS.inc();
}

logWriteFailures(
countState,
response.getStatusCode(),
response.parseAsString(),
response.getStatusMessage());
flushWriteFailures(
events, response.getStatusMessage(), response.getStatusCode(), receiver);
logWriteFailures(countState);

} else {
LOG.info("Successfully wrote {} events", countState.read());
SUCCESSFUL_WRITE_LATENCY_MS.update(Instant.now().toEpochMilli() - startTime);
SUCCESS_WRITES.inc(countState.read());
VALID_REQUESTS.inc();
SUCCESSFUL_WRITE_BATCH_SIZE.update(countState.read());

if (enableBatchLogs) {
LOG.info("Successfully wrote {} events", countState.read());
}
}

} catch (HttpResponseException e) {
Expand All @@ -239,13 +308,26 @@ private void flush(
e.getStatusCode(),
e.getContent(),
e.getStatusMessage());
logWriteFailures(countState);
UNSUCCESSFUL_WRITE_LATENCY_MS.update(System.nanoTime() - startTime);
FAILED_WRITES.inc(countState.read());
int statusCode = e.getStatusCode();
if (statusCode >= 400 && statusCode < 500) {
INVALID_REQUESTS.inc();
} else if (statusCode >= 500 && statusCode < 600) {
SERVER_ERROR_REQUESTS.inc();
}

logWriteFailures(countState, e.getStatusCode(), e.getContent(), e.getStatusMessage());

flushWriteFailures(events, e.getStatusMessage(), e.getStatusCode(), receiver);

} catch (IOException ioe) {
LOG.error("Error writing to Splunk: {}", ioe.getMessage());
logWriteFailures(countState);
UNSUCCESSFUL_WRITE_LATENCY_MS.update(System.nanoTime() - startTime);
FAILED_WRITES.inc(countState.read());
INVALID_REQUESTS.inc();

logWriteFailures(countState, 0, ioe.getMessage(), null);

flushWriteFailures(events, ioe.getMessage(), null, receiver);

Expand All @@ -262,10 +344,20 @@ private void flush(
}
}

/** Logs write failures and update {@link Counter}. */
private void logWriteFailures(@StateId(COUNT_STATE_NAME) ValueState<Long> countState) {
LOG.error("Failed to write {} events", countState.read());
FAILED_WRITES.inc(countState.read());
/** Utility method to log write failures. */
private void logWriteFailures(
@StateId(COUNT_STATE_NAME) ValueState<Long> countState,
int statusCode,
String content,
String statusMessage) {
if (enableBatchLogs) {
LOG.error("Failed to write {} events", countState.read());
}
LOG.error(
"Error writing to Splunk. StatusCode: {}, content: {}, StatusMessage: {}",
statusCode,
content,
statusMessage);
}

/**
Expand Down Expand Up @@ -318,6 +410,10 @@ abstract static class Builder {
abstract Builder setDisableCertificateValidation(
ValueProvider<Boolean> disableCertificateValidation);

abstract Builder setEnableBatchLogs(ValueProvider<Boolean> enableBatchLogs);

abstract Builder setEnableGzipHttpCompression(ValueProvider<Boolean> enableGzipHttpCompression);

abstract Builder setInputBatchCount(ValueProvider<Integer> inputBatchCount);

abstract SplunkEventWriter autoBuild();
Expand Down Expand Up @@ -386,6 +482,26 @@ Builder withDisableCertificateValidation(ValueProvider<Boolean> disableCertifica
return setDisableCertificateValidation(disableCertificateValidation);
}

/**
* Method to enable batch logs.
*
* @param enableBatchLogs for enabling batch logs.
* @return {@link Builder}
*/
public Builder withEnableBatchLogs(ValueProvider<Boolean> enableBatchLogs) {
return setEnableBatchLogs(enableBatchLogs);
}

/**
* Method to specify if HTTP requests sent to Splunk should be GZIP encoded.
*
* @param enableGzipHttpCompression whether to enable Gzip encoding.
* @return {@link Builder}
*/
public Builder withEnableGzipHttpCompression(ValueProvider<Boolean> enableGzipHttpCompression) {
return setEnableGzipHttpCompression(enableGzipHttpCompression);
}

/** Builds a new {@link SplunkEventWriter} objects based on the configuration. */
SplunkEventWriter build() {
checkNotNull(url(), "url needs to be provided.");
Expand Down
Loading