From cb4efc71328181d93599b5cc33f03b6b5c2fd2db Mon Sep 17 00:00:00 2001 From: Hengfeng Li Date: Mon, 14 Feb 2022 16:18:40 +1100 Subject: [PATCH] [BEAM-12164]: display the metadata table's name on UI --- .../beam/sdk/io/gcp/spanner/SpannerIO.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 863d88ab54e6..94e8930a1e54 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -80,6 +80,7 @@ import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; import org.apache.beam.sdk.schemas.Schema; @@ -1442,6 +1443,7 @@ public ReadChangeStream withMetadataDatabase(String metadataDatabase) { return toBuilder().setMetadataDatabase(metadataDatabase).build(); } + /** Specifies the metadata table name. */ public ReadChangeStream withMetadataTable(String metadataTable) { return toBuilder().setMetadataTable(metadataTable).build(); } @@ -1585,6 +1587,11 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta new PostProcessingMetricsDoFn(metrics); LOG.info("Partition metadata table that will be used is " + partitionMetadataTableName); + input + .getPipeline() + .getOptions() + .as(SpannerChangeStreamOptions.class) + .setMetadataTable(partitionMetadataTableName); return input .apply(Impulse.create()) @@ -1596,6 +1603,19 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta } } + /** + * Interface to display the name of the metadata table on Dataflow UI. This is only used for + * internal purpose. This should not be used to pass the name of the metadata table. + */ + public interface SpannerChangeStreamOptions extends StreamingOptions { + + /** Returns the name of the metadata table. */ + String getMetadataTable(); + + /** Specifies the name of the metadata table. */ + void setMetadataTable(String table); + } + private static class ToMutationGroupFn extends DoFn { @ProcessElement public void processElement(ProcessContext c) {