> udfDeadletterTag);
+
+ public abstract LineToFailsafeJson build();
+ }
+ }
+
+ /**
+ * The {@link FailsafeElementToJsonFn} class creates a Json string from a failsafe element.
+ *
+ * {@link FailsafeElementToJsonFn#FailsafeElementToJsonFn(PCollectionView, String, String,
+ * TupleTag)}
+ */
+ public static class FailsafeElementToJsonFn
+ extends DoFn, FailsafeElement> {
+
+ @Nullable public final String jsonSchema;
+ public final String delimiter;
+ public final TupleTag> udfDeadletterTag;
+ @Nullable private final PCollectionView headersView;
+ private Counter successCounter =
+ Metrics.counter(FailsafeElementToJsonFn.class, SUCCESSFUL_TO_JSON_COUNTER);
+ private Counter failedCounter =
+ Metrics.counter(FailsafeElementToJsonFn.class, FAILED_TO_JSON_COUNTER);
+
+ FailsafeElementToJsonFn(
+ PCollectionView headersView,
+ String jsonSchema,
+ String delimiter,
+ TupleTag> udfDeadletterTag) {
+ this.headersView = headersView;
+ this.jsonSchema = jsonSchema;
+ this.delimiter = delimiter;
+ this.udfDeadletterTag = udfDeadletterTag;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ FailsafeElement element = context.element();
+ List header = null;
+
+ if (this.headersView != null) {
+ header = Arrays.asList(context.sideInput(this.headersView).split(this.delimiter));
+ }
+
+ List record = Arrays.asList(element.getOriginalPayload().split(this.delimiter));
+
+ try {
+ String json = buildJsonString(header, record, this.jsonSchema);
+ context.output(FailsafeElement.of(element.getOriginalPayload(), json));
+ successCounter.inc();
+ } catch (Exception e) {
+ failedCounter.inc();
+ context.output(
+ this.udfDeadletterTag,
+ FailsafeElement.of(element)
+ .setErrorMessage(e.getMessage())
+ .setStacktrace(Throwables.getStackTraceAsString(e)));
+ }
+ }
+ }
+
+ /**
+ * The {@link LineToFailsafeElementFn} wraps an csv line with the {@link FailsafeElement} class so
+ * errors can be recovered from and the original message can be output to a error records table.
+ */
+ static class LineToFailsafeElementFn extends DoFn> {
+
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ String message = context.element();
+ context.output(FailsafeElement.of(message, message));
+ }
+ }
+
+ /**
+ * The {@link ReadCsv} class is a {@link PTransform} that reads from one for more Csv files. The
+ * transform returns a {@link PCollectionTuple} consisting of the following {@link PCollection}:
+ *
+ *
+ * - {@link ReadCsv#headerTag()} - Contains headers found in files if read with headers,
+ * contains empty {@link PCollection} if no headers.
+ *
- {@link ReadCsv#lineTag()} - Contains Csv lines as a {@link PCollection} of strings.
+ *
+ */
+ @AutoValue
+ public abstract static class ReadCsv extends PTransform {
+
+ public static Builder newBuilder() {
+ return new AutoValue_CsvConverters_ReadCsv.Builder();
+ }
+
+ public abstract String csvFormat();
+
+ @Nullable
+ public abstract String delimiter();
+
+ public abstract Boolean hasHeaders();
+
+ public abstract String inputFileSpec();
+
+ public abstract TupleTag headerTag();
+
+ public abstract TupleTag lineTag();
+
+ @Override
+ public PCollectionTuple expand(PBegin input) {
+
+ if (hasHeaders()) {
+ return input
+ .apply("MatchFilePattern", FileIO.match().filepattern(inputFileSpec()))
+ .apply("ReadMatches", FileIO.readMatches())
+ .apply(
+ "ReadCsvWithHeaders",
+ ParDo.of(new GetCsvHeadersFn(headerTag(), lineTag(), csvFormat(), delimiter()))
+ .withOutputTags(headerTag(), TupleTagList.of(lineTag())));
+ }
+
+ return PCollectionTuple.of(
+ lineTag(), input.apply("ReadCsvWithoutHeaders", TextIO.read().from(inputFileSpec())));
+ }
+
+ /** Builder for {@link ReadCsv}. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setCsvFormat(String csvFormat);
+
+ public abstract Builder setDelimiter(@Nullable String delimiter);
+
+ public abstract Builder setHasHeaders(Boolean hasHeaders);
+
+ public abstract Builder setInputFileSpec(String inputFileSpec);
+
+ public abstract Builder setHeaderTag(TupleTag headerTag);
+
+ public abstract Builder setLineTag(TupleTag lineTag);
+
+ abstract ReadCsv autoBuild();
+
+ public ReadCsv build() {
+
+ ReadCsv readCsv = autoBuild();
+
+ checkArgument(readCsv.inputFileSpec() != null, "Input file spec must be provided.");
+
+ checkArgument(readCsv.csvFormat() != null, "Csv format must not be null.");
+
+ checkArgument(readCsv.hasHeaders() != null, "Header information must be provided.");
+
+ return readCsv;
+ }
+ }
+ }
+
+ /**
+ * The {@link GetCsvHeadersFn} class gets the header of a Csv file and outputs it as a string. The
+ * csv format provided in {@link CsvConverters#getCsvFormat(String, String)} is used to get the
+ * header.
+ */
+ static class GetCsvHeadersFn extends DoFn {
+
+ private final TupleTag headerTag;
+ private final TupleTag linesTag;
+ private CSVFormat csvFormat;
+
+ GetCsvHeadersFn(
+ TupleTag headerTag, TupleTag linesTag, String csvFormat, String delimiter) {
+ this.headerTag = headerTag;
+ this.linesTag = linesTag;
+ this.csvFormat = getCsvFormat(csvFormat, delimiter);
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context, MultiOutputReceiver outputReceiver) {
+ ReadableFile f = context.element();
+ String headers;
+ List records = null;
+ String delimiter = String.valueOf(this.csvFormat.getDelimiter());
+ try {
+ String csvFileString = f.readFullyAsUTF8String();
+ StringReader reader = new StringReader(csvFileString);
+ CSVParser parser = CSVParser.parse(reader, this.csvFormat.withFirstRecordAsHeader());
+ records =
+ parser.getRecords().stream()
+ .map(i -> String.join(delimiter, i))
+ .collect(Collectors.toList());
+ headers = String.join(delimiter, parser.getHeaderNames());
+ } catch (IOException ioe) {
+ LOG.error("Headers do not match, consistency cannot be guaranteed");
+ throw new RuntimeException("Could not read Csv headers: " + ioe.getMessage());
+ }
+ outputReceiver.get(this.headerTag).output(headers);
+ records.forEach(r -> outputReceiver.get(this.linesTag).output(r));
+ }
+ }
+}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/DurationUtils.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/DurationUtils.java
new file mode 100644
index 000000000000..83d3aea3acb5
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/DurationUtils.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.utils;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Locale;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.MutablePeriod;
+import org.joda.time.format.PeriodFormatterBuilder;
+import org.joda.time.format.PeriodParser;
+
+/**
+ * The {@link DurationUtils} class provides common utilities for manipulating and formatting {@link
+ * Duration} objects.
+ */
+public class DurationUtils {
+
+ /**
+ * Parses a duration from a period formatted string. Values are accepted in the following formats:
+ *
+ * Formats Ns - Seconds. Example: 5s
+ * Nm - Minutes. Example: 13m
+ * Nh - Hours. Example: 2h
+ *
+ *
+ * parseDuration(null) = NullPointerException()
+ * parseDuration("") = Duration.standardSeconds(0)
+ * parseDuration("2s") = Duration.standardSeconds(2)
+ * parseDuration("5m") = Duration.standardMinutes(5)
+ * parseDuration("3h") = Duration.standardHours(3)
+ *
+ *
+ * @param value The period value to parse.
+ * @return The {@link Duration} parsed from the supplied period string.
+ */
+ public static Duration parseDuration(String value) {
+ checkNotNull(value, "The specified duration must be a non-null value!");
+
+ PeriodParser parser =
+ new PeriodFormatterBuilder()
+ .appendSeconds()
+ .appendSuffix("s")
+ .appendMinutes()
+ .appendSuffix("m")
+ .appendHours()
+ .appendSuffix("h")
+ .toParser();
+
+ MutablePeriod period = new MutablePeriod();
+ parser.parseInto(period, value, 0, Locale.getDefault());
+
+ Duration duration = period.toDurationFrom(new DateTime(0));
+ checkArgument(duration.getMillis() > 0, "The window duration must be greater than 0!");
+
+ return duration;
+ }
+}
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java
new file mode 100644
index 000000000000..04beac038926
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/datatokenization/utils/ErrorConverters.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.complete.datatokenization.utils;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.auto.value.AutoValue;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
+import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PDone;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Duration;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+/** Transforms & DoFns & Options for Teleport Error logging. */
+public class ErrorConverters {
+
+ /** Writes all Errors to GCS, place at the end of your pipeline. */
+ @AutoValue
+ public abstract static class WriteStringMessageErrorsAsCsv
+ extends PTransform>, PDone> {
+
+ public static Builder newBuilder() {
+ return new AutoValue_ErrorConverters_WriteStringMessageErrorsAsCsv.Builder();
+ }
+
+ public abstract String errorWritePath();
+
+ public abstract String csvDelimiter();
+
+ @Nullable
+ public abstract Duration windowDuration();
+
+ @SuppressWarnings("argument.type.incompatible")
+ @Override
+ public PDone expand(PCollection> pCollection) {
+
+ PCollection formattedErrorRows =
+ pCollection.apply(
+ "GetFormattedErrorRow", ParDo.of(new FailedStringToCsvRowFn(csvDelimiter())));
+
+ if (pCollection.isBounded() == IsBounded.UNBOUNDED) {
+ if (windowDuration() != null) {
+ formattedErrorRows =
+ formattedErrorRows.apply(Window.into(FixedWindows.of(windowDuration())));
+ }
+ return formattedErrorRows.apply(
+ TextIO.write().to(errorWritePath()).withNumShards(1).withWindowedWrites());
+
+ } else {
+ return formattedErrorRows.apply(TextIO.write().to(errorWritePath()).withNumShards(1));
+ }
+ }
+
+ /** Builder for {@link WriteStringMessageErrorsAsCsv}. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setErrorWritePath(String errorWritePath);
+
+ public abstract Builder setCsvDelimiter(String csvDelimiter);
+
+ public abstract Builder setWindowDuration(@Nullable Duration duration);
+
+ public abstract WriteStringMessageErrorsAsCsv build();
+ }
+ }
+
+ /**
+ * The {@link FailedStringToCsvRowFn} converts string objects which have failed processing into
+ * {@link String} objects contained CSV which can be output to a filesystem.
+ */
+ public static class FailedStringToCsvRowFn extends DoFn, String> {
+
+ /**
+ * The formatter used to convert timestamps into a BigQuery compatible format.
+ */
+ private static final DateTimeFormatter TIMESTAMP_FORMATTER =
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
+
+ private final String csvDelimiter;
+
+ public FailedStringToCsvRowFn(String csvDelimiter) {
+ this.csvDelimiter = csvDelimiter;
+ }
+
+ public FailedStringToCsvRowFn() {
+ this.csvDelimiter = ",";
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ FailsafeElement failsafeElement = context.element();
+ ArrayList outputRow = new ArrayList<>();
+ final String message = failsafeElement.getOriginalPayload();
+
+ // Format the timestamp for insertion
+ String timestamp =
+ TIMESTAMP_FORMATTER.print(context.timestamp().toDateTime(DateTimeZone.UTC));
+
+ outputRow.add(timestamp);
+ outputRow.add(failsafeElement.getErrorMessage());
+
+ // Only set the payload if it's populated on the message.
+ if (message != null) {
+ outputRow.add(message);
+ }
+
+ context.output(String.join(csvDelimiter, outputRow));
+ }
+ }
+
+ /** Write errors as string encoded messages. */
+ @AutoValue
+ public abstract static class WriteStringMessageErrors
+ extends PTransform>, WriteResult> {
+
+ public static Builder newBuilder() {
+ return new AutoValue_ErrorConverters_WriteStringMessageErrors.Builder();
+ }
+
+ public abstract String getErrorRecordsTable();
+
+ public abstract String getErrorRecordsTableSchema();
+
+ @Override
+ public WriteResult expand(PCollection> failedRecords) {
+
+ return failedRecords
+ .apply("FailedRecordToTableRow", ParDo.of(new FailedStringToTableRowFn()))
+ .apply(
+ "WriteFailedRecordsToBigQuery",
+ BigQueryIO.writeTableRows()
+ .to(getErrorRecordsTable())
+ .withJsonSchema(getErrorRecordsTableSchema())
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(WriteDisposition.WRITE_APPEND));
+ }
+
+ /** Builder for {@link WriteStringMessageErrors}. */
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setErrorRecordsTable(String errorRecordsTable);
+
+ public abstract Builder setErrorRecordsTableSchema(String errorRecordsTableSchema);
+
+ public abstract WriteStringMessageErrors build();
+ }
+ }
+
+ /**
+ * The {@link FailedStringToTableRowFn} converts string objects which have failed processing into
+ * {@link TableRow} objects which can be output to a dead-letter table.
+ */
+ public static class FailedStringToTableRowFn
+ extends DoFn, TableRow> {
+
+ /**
+ * The formatter used to convert timestamps into a BigQuery compatible format.
+ */
+ private static final DateTimeFormatter TIMESTAMP_FORMATTER =
+ DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSSSS");
+
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ FailsafeElement failsafeElement = context.element();
+ final String message = failsafeElement.getOriginalPayload();
+
+ // Format the timestamp for insertion
+ String timestamp =
+ TIMESTAMP_FORMATTER.print(context.timestamp().toDateTime(DateTimeZone.UTC));
+
+ // Build the table row
+ final TableRow failedRow =
+ new TableRow()
+ .set("timestamp", timestamp)
+ .set("errorMessage", failsafeElement.getErrorMessage())
+ .set("stacktrace", failsafeElement.getStacktrace());
+
+ // Only set the payload if it's populated on the message.
+ if (message != null) {
+ failedRow
+ .set("payloadString", message)
+ .set("payloadBytes", message.getBytes(StandardCharsets.UTF_8));
+ }
+
+ context.output(failedRow);
+ }
+ }
+
+ /**
+ * {@link WriteErrorsToTextIO} is a {@link PTransform} that writes strings error messages to file
+ * system using TextIO and custom line format {@link SerializableFunction} to convert errors in
+ * necessary format.
+ * Example of usage in pipeline:
+ *
+ * {@code
+ * pCollection.apply("Write to TextIO",
+ * WriteErrorsToTextIO.newBuilder()
+ * .setErrorWritePath("errors.txt")
+ * .setTranslateFunction((FailsafeElement failsafeElement) -> {
+ * ArrayList outputRow = new ArrayList<>();
+ * final String message = failsafeElement.getOriginalPayload();
+ * String timestamp = Instant.now().toString();
+ * outputRow.add(timestamp);
+ * outputRow.add(failsafeElement.getErrorMessage());
+ * outputRow.add(failsafeElement.getStacktrace());
+ * // Only set the payload if it's populated on the message.
+ * if (failsafeElement.getOriginalPayload() != null) {
+ * outputRow.add(message);
+ * }
+ *
+ * return String.join(",",outputRow);
+ * })
+ * }
+ */
+ @AutoValue
+ public abstract static class WriteErrorsToTextIO
+ extends PTransform>, PDone> {
+
+ public static WriteErrorsToTextIO.Builder newBuilder() {
+ return new AutoValue_ErrorConverters_WriteErrorsToTextIO.Builder<>();
+ }
+
+ public abstract String errorWritePath();
+
+ public abstract SerializableFunction, String> translateFunction();
+
+ @Nullable
+ public abstract Duration windowDuration();
+
+ @Override
+ @SuppressWarnings("argument.type.incompatible")
+ public PDone expand(PCollection> pCollection) {
+
+ PCollection formattedErrorRows =
+ pCollection.apply(
+ "GetFormattedErrorRow",
+ MapElements.into(TypeDescriptors.strings()).via(translateFunction()));
+
+ if (pCollection.isBounded() == PCollection.IsBounded.UNBOUNDED) {
+ if (windowDuration() == null) {
+ throw new RuntimeException("Unbounded input requires window interval to be set");
+ }
+ return formattedErrorRows
+ .apply(Window.into(FixedWindows.of(windowDuration())))
+ .apply(TextIO.write().to(errorWritePath()).withNumShards(1).withWindowedWrites());
+ }
+
+ return formattedErrorRows.apply(TextIO.write().to(errorWritePath()).withNumShards(1));
+ }
+
+ /** Builder for {@link WriteErrorsToTextIO}. */
+ @AutoValue.Builder
+ public abstract static class Builder