diff --git a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/HttpEventPublisher.java b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/HttpEventPublisher.java index c24b16c7b733..8c4613a6d21b 100644 --- a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/HttpEventPublisher.java +++ b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/HttpEventPublisher.java @@ -20,6 +20,7 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; import com.google.api.client.http.ByteArrayContent; +import com.google.api.client.http.GZipEncoding; import com.google.api.client.http.GenericUrl; import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler; import com.google.api.client.http.HttpBackOffUnsuccessfulResponseHandler.BackOffRequired; @@ -104,6 +105,8 @@ static Builder newBuilder() { abstract Boolean disableCertificateValidation(); + abstract Boolean enableGzipHttpCompression(); + /** * Executes a POST for the list of {@link SplunkEvent} objects into Splunk's Http Event Collector * endpoint. @@ -116,6 +119,10 @@ HttpResponse execute(List events) throws IOException { HttpContent content = getContent(events); HttpRequest request = requestFactory().buildPostRequest(genericUrl(), content); + if (enableGzipHttpCompression()) { + request.setEncoding(new GZipEncoding()); + } + HttpBackOffUnsuccessfulResponseHandler responseHandler = new HttpBackOffUnsuccessfulResponseHandler(getConfiguredBackOff()); @@ -208,6 +215,8 @@ abstract static class Builder { abstract Boolean disableCertificateValidation(); + abstract Builder setEnableGzipHttpCompression(Boolean enableGzipHttpCompression); + abstract Builder setMaxElapsedMillis(Integer maxElapsedMillis); abstract Integer maxElapsedMillis(); @@ -250,6 +259,19 @@ Builder withDisableCertificateValidation(Boolean disableCertificateValidation) { return setDisableCertificateValidation(disableCertificateValidation); } + /** + * Method to specify if HTTP requests sent to Splunk HEC should be GZIP encoded. + * + * @param enableGzipHttpCompression whether to enable Gzip encoding. + * @return {@link Builder} + */ + public Builder withEnableGzipHttpCompression(Boolean enableGzipHttpCompression) { + checkNotNull( + enableGzipHttpCompression, + "withEnableGzipHttpCompression(enableGzipHttpCompression) called with null input."); + return setEnableGzipHttpCompression(enableGzipHttpCompression); + } + /** * Method to max timeout for {@link ExponentialBackOff}. Otherwise uses the default setting for * {@link ExponentialBackOff}. diff --git a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventWriter.java b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventWriter.java index f790d8b42d9c..00b0a0ea8f55 100644 --- a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventWriter.java +++ b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkEventWriter.java @@ -30,8 +30,10 @@ import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; +import java.time.Instant; import java.util.List; import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.state.BagState; @@ -64,6 +66,8 @@ abstract class SplunkEventWriter extends DoFn, SplunkWr private static final Integer DEFAULT_BATCH_COUNT = 1; private static final Boolean DEFAULT_DISABLE_CERTIFICATE_VALIDATION = false; + private static final Boolean DEFAULT_ENABLE_BATCH_LOGS = true; + private static final Boolean DEFAULT_ENABLE_GZIP_HTTP_COMPRESSION = true; private static final Logger LOG = LoggerFactory.getLogger(SplunkEventWriter.class); private static final long DEFAULT_FLUSH_DELAY = 2; private static final Counter INPUT_COUNTER = @@ -72,6 +76,18 @@ abstract class SplunkEventWriter extends DoFn, SplunkWr Metrics.counter(SplunkEventWriter.class, "outbound-successful-events"); private static final Counter FAILED_WRITES = Metrics.counter(SplunkEventWriter.class, "outbound-failed-events"); + private static final Counter INVALID_REQUESTS = + Metrics.counter(SplunkEventWriter.class, "http-invalid-requests"); + private static final Counter SERVER_ERROR_REQUESTS = + Metrics.counter(SplunkEventWriter.class, "http-server-error-requests"); + private static final Counter VALID_REQUESTS = + Metrics.counter(SplunkEventWriter.class, "http-valid-requests"); + private static final Distribution SUCCESSFUL_WRITE_LATENCY_MS = + Metrics.distribution(SplunkEventWriter.class, "successful_write_to_splunk_latency_ms"); + private static final Distribution UNSUCCESSFUL_WRITE_LATENCY_MS = + Metrics.distribution(SplunkEventWriter.class, "unsuccessful_write_to_splunk_latency_ms"); + private static final Distribution SUCCESSFUL_WRITE_BATCH_SIZE = + Metrics.distribution(SplunkEventWriter.class, "write_to_splunk_batch"); private static final String BUFFER_STATE_NAME = "buffer"; private static final String COUNT_STATE_NAME = "count"; private static final String TIME_ID_NAME = "expiry"; @@ -88,6 +104,8 @@ abstract class SplunkEventWriter extends DoFn, SplunkWr private Integer batchCount; private Boolean disableValidation; private HttpEventPublisher publisher; + private Boolean enableBatchLogs; + private Boolean enableGzipHttpCompression; private static final Gson GSON = new GsonBuilder().setFieldNamingStrategy(f -> f.getName().toLowerCase()).create(); @@ -105,6 +123,10 @@ static Builder newBuilder() { abstract @Nullable ValueProvider inputBatchCount(); + abstract @Nullable ValueProvider enableBatchLogs(); + + abstract @Nullable ValueProvider enableGzipHttpCompression(); + @Setup public void setup() { @@ -134,12 +156,36 @@ public void setup() { LOG.info("Disable certificate validation set to: {}", disableValidation); } + // Either user supplied or default enableBatchLogs. + if (enableBatchLogs == null) { + + if (enableBatchLogs() != null) { + enableBatchLogs = enableBatchLogs().get(); + } + + enableBatchLogs = MoreObjects.firstNonNull(enableBatchLogs, DEFAULT_ENABLE_BATCH_LOGS); + LOG.info("Enable Batch logs set to: {}", enableBatchLogs); + } + + // Either user supplied or default enableGzipHttpCompression. + if (enableGzipHttpCompression == null) { + + if (enableGzipHttpCompression() != null) { + enableGzipHttpCompression = enableGzipHttpCompression().get(); + } + + enableGzipHttpCompression = + MoreObjects.firstNonNull(enableGzipHttpCompression, DEFAULT_ENABLE_GZIP_HTTP_COMPRESSION); + LOG.info("Enable gzip http compression set to: {}", enableGzipHttpCompression); + } + try { HttpEventPublisher.Builder builder = HttpEventPublisher.newBuilder() .withUrl(url().get()) .withToken(token().get()) - .withDisableCertificateValidation(disableValidation); + .withDisableCertificateValidation(disableValidation) + .withEnableGzipHttpCompression(enableGzipHttpCompression); publisher = builder.build(); LOG.info("Successfully created HttpEventPublisher"); @@ -172,8 +218,9 @@ public void processElement( timer.offset(Duration.standardSeconds(DEFAULT_FLUSH_DELAY)).setRelative(); if (count >= batchCount) { - - LOG.info("Flushing batch of {} events", count); + if (enableBatchLogs) { + LOG.info("Flushing batch of {} events", count); + } flush(receiver, bufferState, countState); } } @@ -186,7 +233,9 @@ public void onExpiry( throws IOException { if (MoreObjects.firstNonNull(countState.read(), 0L) > 0) { - LOG.info("Flushing window with {} events", countState.read()); + if (enableBatchLogs) { + LOG.info("Flushing window with {} events", countState.read()); + } flush(receiver, bufferState, countState); } } @@ -219,18 +268,38 @@ private void flush( HttpResponse response = null; List events = Lists.newArrayList(bufferState.read()); + long startTime = System.nanoTime(); try { // Important to close this response to avoid connection leak. response = publisher.execute(events); if (!response.isSuccessStatusCode()) { + UNSUCCESSFUL_WRITE_LATENCY_MS.update(System.nanoTime() - startTime); + FAILED_WRITES.inc(countState.read()); + int statusCode = response.getStatusCode(); + if (statusCode >= 400 && statusCode < 500) { + INVALID_REQUESTS.inc(); + } else if (statusCode >= 500 && statusCode < 600) { + SERVER_ERROR_REQUESTS.inc(); + } + + logWriteFailures( + countState, + response.getStatusCode(), + response.parseAsString(), + response.getStatusMessage()); flushWriteFailures( events, response.getStatusMessage(), response.getStatusCode(), receiver); - logWriteFailures(countState); } else { - LOG.info("Successfully wrote {} events", countState.read()); + SUCCESSFUL_WRITE_LATENCY_MS.update(Instant.now().toEpochMilli() - startTime); SUCCESS_WRITES.inc(countState.read()); + VALID_REQUESTS.inc(); + SUCCESSFUL_WRITE_BATCH_SIZE.update(countState.read()); + + if (enableBatchLogs) { + LOG.info("Successfully wrote {} events", countState.read()); + } } } catch (HttpResponseException e) { @@ -239,13 +308,26 @@ private void flush( e.getStatusCode(), e.getContent(), e.getStatusMessage()); - logWriteFailures(countState); + UNSUCCESSFUL_WRITE_LATENCY_MS.update(System.nanoTime() - startTime); + FAILED_WRITES.inc(countState.read()); + int statusCode = e.getStatusCode(); + if (statusCode >= 400 && statusCode < 500) { + INVALID_REQUESTS.inc(); + } else if (statusCode >= 500 && statusCode < 600) { + SERVER_ERROR_REQUESTS.inc(); + } + + logWriteFailures(countState, e.getStatusCode(), e.getContent(), e.getStatusMessage()); flushWriteFailures(events, e.getStatusMessage(), e.getStatusCode(), receiver); } catch (IOException ioe) { LOG.error("Error writing to Splunk: {}", ioe.getMessage()); - logWriteFailures(countState); + UNSUCCESSFUL_WRITE_LATENCY_MS.update(System.nanoTime() - startTime); + FAILED_WRITES.inc(countState.read()); + INVALID_REQUESTS.inc(); + + logWriteFailures(countState, 0, ioe.getMessage(), null); flushWriteFailures(events, ioe.getMessage(), null, receiver); @@ -262,10 +344,20 @@ private void flush( } } - /** Logs write failures and update {@link Counter}. */ - private void logWriteFailures(@StateId(COUNT_STATE_NAME) ValueState countState) { - LOG.error("Failed to write {} events", countState.read()); - FAILED_WRITES.inc(countState.read()); + /** Utility method to log write failures. */ + private void logWriteFailures( + @StateId(COUNT_STATE_NAME) ValueState countState, + int statusCode, + String content, + String statusMessage) { + if (enableBatchLogs) { + LOG.error("Failed to write {} events", countState.read()); + } + LOG.error( + "Error writing to Splunk. StatusCode: {}, content: {}, StatusMessage: {}", + statusCode, + content, + statusMessage); } /** @@ -318,6 +410,10 @@ abstract static class Builder { abstract Builder setDisableCertificateValidation( ValueProvider disableCertificateValidation); + abstract Builder setEnableBatchLogs(ValueProvider enableBatchLogs); + + abstract Builder setEnableGzipHttpCompression(ValueProvider enableGzipHttpCompression); + abstract Builder setInputBatchCount(ValueProvider inputBatchCount); abstract SplunkEventWriter autoBuild(); @@ -386,6 +482,26 @@ Builder withDisableCertificateValidation(ValueProvider disableCertifica return setDisableCertificateValidation(disableCertificateValidation); } + /** + * Method to enable batch logs. + * + * @param enableBatchLogs for enabling batch logs. + * @return {@link Builder} + */ + public Builder withEnableBatchLogs(ValueProvider enableBatchLogs) { + return setEnableBatchLogs(enableBatchLogs); + } + + /** + * Method to specify if HTTP requests sent to Splunk should be GZIP encoded. + * + * @param enableGzipHttpCompression whether to enable Gzip encoding. + * @return {@link Builder} + */ + public Builder withEnableGzipHttpCompression(ValueProvider enableGzipHttpCompression) { + return setEnableGzipHttpCompression(enableGzipHttpCompression); + } + /** Builds a new {@link SplunkEventWriter} objects based on the configuration. */ SplunkEventWriter build() { checkNotNull(url(), "url needs to be provided."); diff --git a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkIO.java b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkIO.java index 938ae5ef2b78..633f25b0b9d6 100644 --- a/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkIO.java +++ b/sdks/java/io/splunk/src/main/java/org/apache/beam/sdk/io/splunk/SplunkIO.java @@ -64,6 +64,9 @@ *
  • batchCount - Number of events in a single batch. *
  • disableCertificateValidation - Whether to disable ssl validation (useful for self-signed * certificates) + *
  • enableBatchLogs - Whether to enable batch logs. + *
  • enableGzipHttpCompression - Whether HTTP requests sent to Splunk HEC should be GZIP + * encoded. * * *

    This transform will return any non-transient write failures via a {@link PCollection @@ -139,6 +142,10 @@ public abstract static class Write abstract @Nullable ValueProvider disableCertificateValidation(); + abstract @Nullable ValueProvider enableBatchLogs(); + + abstract @Nullable ValueProvider enableGzipHttpCompression(); + abstract Builder toBuilder(); @Override @@ -150,7 +157,10 @@ public PCollection expand(PCollection input) { .withUrl(url()) .withInputBatchCount(batchCount()) .withDisableCertificateValidation(disableCertificateValidation()) - .withToken(token()); + .withToken(token()) + .withEnableBatchLogs(enableBatchLogs()) + .withEnableGzipHttpCompression(enableGzipHttpCompression()); + ; SplunkEventWriter writer = builder.build(); LOG.info("SplunkEventWriter configured"); @@ -176,6 +186,11 @@ abstract static class Builder { abstract Builder setDisableCertificateValidation( ValueProvider disableCertificateValidation); + abstract Builder setEnableBatchLogs(ValueProvider enableBatchLogs); + + abstract Builder setEnableGzipHttpCompression( + ValueProvider enableGzipHttpCompression); + abstract Write build(); } @@ -249,6 +264,60 @@ public Write withDisableCertificateValidation(Boolean disableCertificateValidati .build(); } + /** + * Same as {@link Builder#withEnableBatchLogs(ValueProvider)} but without a {@link + * ValueProvider}. + * + * @param enableBatchLogs whether to enable Gzip encoding. + * @return {@link Builder} + */ + public Write withEnableBatchLogs(ValueProvider enableBatchLogs) { + checkArgument( + enableBatchLogs != null, "withEnableBatchLogs(enableBatchLogs) called with null input."); + return toBuilder().setEnableBatchLogs(enableBatchLogs).build(); + } + + /** + * Method to enable batch logs. + * + * @param enableBatchLogs whether to enable Gzip encoding. + * @return {@link Builder} + */ + public Write withEnableBatchLogs(Boolean enableBatchLogs) { + checkArgument( + enableBatchLogs != null, "withEnableBatchLogs(enableBatchLogs) called with null input."); + return toBuilder().setEnableBatchLogs(StaticValueProvider.of(enableBatchLogs)).build(); + } + + /** + * Same as {@link Builder#withEnableGzipHttpCompression(ValueProvider)} but without a {@link + * ValueProvider}. + * + * @param enableGzipHttpCompression whether to enable Gzip encoding. + * @return {@link Builder} + */ + public Write withEnableGzipHttpCompression(ValueProvider enableGzipHttpCompression) { + checkArgument( + enableGzipHttpCompression != null, + "withEnableGzipHttpCompression(enableGzipHttpCompression) called with null input."); + return toBuilder().setEnableGzipHttpCompression(enableGzipHttpCompression).build(); + } + + /** + * Method to specify if HTTP requests sent to Splunk should be GZIP encoded. + * + * @param enableGzipHttpCompression whether to enable Gzip encoding. + * @return {@link Builder} + */ + public Write withEnableGzipHttpCompression(Boolean enableGzipHttpCompression) { + checkArgument( + enableGzipHttpCompression != null, + "withEnableGzipHttpCompression(enableGzipHttpCompression) called with null input."); + return toBuilder() + .setEnableGzipHttpCompression(StaticValueProvider.of(enableGzipHttpCompression)) + .build(); + } + /** * Provides synthetic keys that are used to control the number of parallel requests towards the * Splunk HEC endpoint. diff --git a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/HttpEventPublisherTest.java b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/HttpEventPublisherTest.java index 99ad4beba156..aa02cc6b08bb 100644 --- a/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/HttpEventPublisherTest.java +++ b/sdks/java/io/splunk/src/test/java/org/apache/beam/sdk/io/splunk/HttpEventPublisherTest.java @@ -69,6 +69,7 @@ public void stringPayloadTest() .withUrl("http://example.com") .withToken("test-token") .withDisableCertificateValidation(false) + .withEnableGzipHttpCompression(true) .build(); String actual = publisher.getStringPayload(SPLUNK_EVENTS); @@ -92,6 +93,7 @@ public void contentTest() .withUrl("http://example.com") .withToken("test-token") .withDisableCertificateValidation(false) + .withEnableGzipHttpCompression(true) .build(); String expectedString = @@ -118,7 +120,8 @@ public void genericURLTest() HttpEventPublisher.newBuilder() .withUrl(baseURL) .withToken("test-token") - .withDisableCertificateValidation(false); + .withDisableCertificateValidation(false) + .withEnableGzipHttpCompression(true); assertEquals( new GenericUrl(Joiner.on('/').join(baseURL, "services/collector/event")), @@ -134,6 +137,7 @@ public void configureBackOffDefaultTest() .withUrl("http://example.com") .withToken("test-token") .withDisableCertificateValidation(false) + .withEnableGzipHttpCompression(true) .build(); assertEquals( @@ -151,6 +155,7 @@ public void configureBackOffCustomTest() .withUrl("http://example.com") .withToken("test-token") .withDisableCertificateValidation(false) + .withEnableGzipHttpCompression(true) .withMaxElapsedMillis(timeoutInMillis) .build();