From 7a606997ccabb7f1916c4ed28174b9e0710f0976 Mon Sep 17 00:00:00 2001 From: Mackenzie Clark Date: Mon, 22 Feb 2021 16:13:46 +0000 Subject: [PATCH 1/7] Returning successes from FhirIO executeBundles. Needed for healthcare solutions accuracy in logging what was written to the fhir store. --- .../beam/sdk/io/gcp/healthcare/FhirIO.java | 119 ++++++++++++------ 1 file changed, 82 insertions(+), 37 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index 359ebf420fbd..7e6bcff71ba5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -94,6 +94,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +// TODO(b/175212927): Remove this package and use the Beam library once FHIR Search connector is +// released. /** * {@link FhirIO} provides an API for reading and writing resources to Google Cloud Healthcare Fhir API. @@ -238,7 +240,7 @@ * */ @SuppressWarnings({ - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) + "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) }) public class FhirIO { private static final String BASE_METRIC_PREFIX = "fhirio/"; @@ -568,13 +570,15 @@ private String fetchResource(HealthcareApiClient client, String resourceId) @AutoValue public abstract static class Write extends PTransform, Write.Result> { - /** The tag for the failed writes to FHIR store`. */ + /** The tag for successful writes to FHIR store. */ + public static final TupleTag SUCCESSFUL_BODY = new TupleTag() {}; + /** The tag for the failed writes to FHIR store. */ public static final TupleTag> FAILED_BODY = new TupleTag>() {}; - /** The tag for the files that failed to FHIR store`. */ + /** The tag for the files that failed to FHIR store. */ public static final TupleTag> FAILED_FILES = new TupleTag>() {}; - /** The tag for temp files for import to FHIR store`. */ + /** The tag for temp files for import to FHIR store. */ public static final TupleTag TEMP_FILES = new TupleTag() {}; /** The enum Write method. */ @@ -595,25 +599,46 @@ public enum WriteMethod { /** The type Result. */ public static class Result implements POutput { private final Pipeline pipeline; + private final PCollection successfulBodies; private final PCollection> failedBodies; private final PCollection> failedFiles; /** - * Creates a {@link FhirIO.Write.Result} in the given {@link Pipeline}. @param pipeline the - * pipeline + * Creates a {@link FhirIO.Write.Result} in the given {@link Pipeline}. * + * @param pipeline the pipeline * @param failedBodies the failed inserts * @return the result */ static Result in(Pipeline pipeline, PCollection> failedBodies) { - return new Result(pipeline, failedBodies, null); + return new Result(pipeline, null, failedBodies, null); + } + + /** + * Creates a {@link FhirIO.Write.Result} in the given {@link Pipeline}. + * + * @param pipeline the pipeline + * @param bodies the successful and failing bodies results. + * @return the result + */ + static Result in(Pipeline pipeline, PCollectionTuple bodies) { + return new Result(pipeline, bodies.get(SUCCESSFUL_BODY), bodies.get(FAILED_BODY), null); } static Result in( Pipeline pipeline, PCollection> failedBodies, PCollection> failedFiles) { - return new Result(pipeline, failedBodies, failedFiles); + return new Result(pipeline, null, failedBodies, failedFiles); + } + + /** + * Gets successful bodies from Write. + * + * @return the entries that were inserted + */ + public PCollection getSuccessfulBodies() { + return this.successfulBodies; } /** @@ -641,7 +666,13 @@ public Pipeline getPipeline() { @Override public Map, PValue> expand() { - return ImmutableMap.of(Write.FAILED_BODY, failedBodies, Write.FAILED_FILES, failedFiles); + return ImmutableMap.of( + SUCCESSFUL_BODY, + successfulBodies, + FAILED_BODY, + failedBodies, + Write.FAILED_FILES, + failedFiles); } @Override @@ -650,9 +681,15 @@ public void finishSpecifyingOutput( private Result( Pipeline pipeline, + @Nullable PCollection successfulBodies, PCollection> failedBodies, @Nullable PCollection> failedFiles) { this.pipeline = pipeline; + if (successfulBodies == null) { + successfulBodies = + (PCollection) pipeline.apply(Create.empty(StringUtf8Coder.of())); + } + this.successfulBodies = successfulBodies; this.failedBodies = failedBodies; if (failedFiles == null) { failedFiles = @@ -808,7 +845,7 @@ public static Write fhirStoresImport( * Execute Bundle Method executes a batch of requests as a single transaction @see . * - * @param fhirStore the hl 7 v 2 store + * @param fhirStore the fhir store * @return the write */ public static Write executeBundles(String fhirStore) { @@ -835,8 +872,7 @@ public static Write executeBundles(ValueProvider fhirStore) { @Override public Result expand(PCollection input) { - PCollection> failedBundles; - PCollection> failedImports; + PCollectionTuple bundles; switch (this.getWriteMethod()) { case IMPORT: LOG.warn( @@ -854,14 +890,15 @@ public Result expand(PCollection input) { return input.apply(new Import(getFhirStore(), tempPath, deadPath, contentStructure)); case EXECUTE_BUNDLE: default: - failedBundles = - input - .apply( - "Execute FHIR Bundles", - ParDo.of(new ExecuteBundles.ExecuteBundlesFn(this.getFhirStore()))) - .setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); + bundles = + input.apply( + "Execute FHIR Bundles", + ParDo.of(new ExecuteBundles.ExecuteBundlesFn(this.getFhirStore())) + .withOutputTags(SUCCESSFUL_BODY, TupleTagList.of(FAILED_BODY))); + bundles.get(SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of()); + bundles.get(FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); } - return Result.in(input.getPipeline(), failedBundles); + return Result.in(input.getPipeline(), bundles); } } @@ -963,7 +1000,7 @@ Optional> getImportGcsDeadLetterPath() { public Write.Result expand(PCollection input) { checkState( input.isBounded() == IsBounded.BOUNDED, - "FhirIO.Import should only be used on unbounded PCollections as it is" + "FhirIO.Import should only be used on bounded PCollections as it is" + "intended for batch use only."); // fall back on pipeline's temp location. @@ -976,11 +1013,11 @@ public Write.Result expand(PCollection input) { input.apply( "Write nd json to GCS", ParDo.of(new WriteBundlesToFilesFn(fhirStore, tempPath, deadLetterGcsPath)) - .withOutputTags(Write.TEMP_FILES, TupleTagList.of(Write.FAILED_BODY))); + .withOutputTags(Write.TEMP_FILES, TupleTagList.of(FAILED_BODY))); PCollection> failedBodies = writeTmpFileResults - .get(Write.FAILED_BODY) + .get(FAILED_BODY) .setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); int numShards = 100; PCollection> failedFiles = @@ -1016,12 +1053,15 @@ public Write.Result expand(PCollection input) { new DoFn() { @ProcessElement public void delete(@Element Metadata path, ProcessContext context) { - // Wait til window closes for failedBodies and failedFiles to ensure we are + // Wait til window closes for failedBodies and + // failedFiles to ensure we are // done processing - // anything under tempGcsPath because it has been successfully imported to + // anything under tempGcsPath because it has been + // successfully imported to // FHIR store or // copies have been moved to the dead letter path. - // Clean up all of tempGcsPath. This will handle removing phantom temporary + // Clean up all of tempGcsPath. This will handle + // removing phantom temporary // objects from // failed / rescheduled ImportFn::importBatch. try { @@ -1123,8 +1163,7 @@ public void addToFile(ProcessContext context, BoundedWindow window) throws IOExc + "Dropping message from batch import.", httpBody.toString(), e.getLocation().getCharOffset(), e.getMessage()); LOG.warn(resource); - context.output( - Write.FAILED_BODY, HealthcareIOError.of(httpBody, new IOException(resource))); + context.output(FAILED_BODY, HealthcareIOError.of(httpBody, new IOException(resource))); } } @@ -1220,7 +1259,8 @@ public void importBatch( client.importFhirResource( fhirStore.get(), importUri.toString(), contentStructure.name()); client.pollOperation(operation, 500L); - // Clean up temp files on GCS as they we successfully imported to FHIR store and no longer + // Clean up temp files on GCS as they we successfully imported to FHIR store and + // no longer // needed. FileSystems.delete(tempDestinations); } catch (IOException | InterruptedException e) { @@ -1233,7 +1273,8 @@ public void importBatch( FileSystems.rename(tempDestinations, deadLetterDestinations); output.output(HealthcareIOError.of(importUri.toString(), e)); } finally { - // If we've reached this point files have either been successfully import to FHIR store + // If we've reached this point files have either been successfully import to + // FHIR store // or moved to Dead Letter Queue. // Clean up original files for this batch on GCS. FileSystems.delete(ImmutableList.copyOf(batch)); @@ -1287,15 +1328,17 @@ public static class ExecuteBundles extends PTransform, Write @Override public FhirIO.Write.Result expand(PCollection input) { - return Write.Result.in( - input.getPipeline(), - input - .apply(ParDo.of(new ExecuteBundlesFn(fhirStore))) - .setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()))); + PCollectionTuple bodies = + input.apply( + ParDo.of(new ExecuteBundlesFn(fhirStore)) + .withOutputTags(Write.SUCCESSFUL_BODY, TupleTagList.of(Write.FAILED_BODY))); + bodies.get(Write.SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of()); + bodies.get(Write.FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); + return Write.Result.in(input.getPipeline(), bodies); } /** The type Write Fhir fn. */ - static class ExecuteBundlesFn extends DoFn> { + static class ExecuteBundlesFn extends DoFn { private static final Counter EXECUTE_BUNDLE_ERRORS = Metrics.counter( @@ -1324,7 +1367,7 @@ static class ExecuteBundlesFn extends DoFn> { /** * Initialize healthcare client. * - * @throws IOException the io exception + * @throws IOException If the Healthcare client cannot be created. */ @Setup public void initClient() throws IOException { @@ -1341,9 +1384,10 @@ public void executeBundles(ProcessContext context) { client.executeFhirBundle(fhirStore.get(), body); EXECUTE_BUNDLE_LATENCY_MS.update(Instant.now().toEpochMilli() - startTime); EXECUTE_BUNDLE_SUCCESS.inc(); + context.output(Write.SUCCESSFUL_BODY, body); } catch (IOException | HealthcareHttpException e) { EXECUTE_BUNDLE_ERRORS.inc(); - context.output(HealthcareIOError.of(body, e)); + context.output(Write.FAILED_BODY, HealthcareIOError.of(body, e)); } } } @@ -1685,6 +1729,7 @@ private JsonArray searchResources( new HttpHealthcareApiClient.FhirResourcePages.FhirResourcePagesIterator( client, fhirStore, resourceType, parameterObjects); JsonArray result = new JsonArray(); + result.addAll(iter.next()); while (iter.hasNext()) { result.addAll(iter.next()); } From 7f891c3a49321d5139db56b7e14d0254cd2eaa4a Mon Sep 17 00:00:00 2001 From: Mackenzie Clark Date: Mon, 22 Feb 2021 16:28:28 +0000 Subject: [PATCH 2/7] Syncing with milenas change --- .../main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index 7e6bcff71ba5..d875e7d27cbc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -1729,7 +1729,6 @@ private JsonArray searchResources( new HttpHealthcareApiClient.FhirResourcePages.FhirResourcePagesIterator( client, fhirStore, resourceType, parameterObjects); JsonArray result = new JsonArray(); - result.addAll(iter.next()); while (iter.hasNext()) { result.addAll(iter.next()); } From ba4a1114c7803070aeb83b60d3b062ae92b34b30 Mon Sep 17 00:00:00 2001 From: Mackenzie Clark Date: Mon, 22 Feb 2021 16:38:14 +0000 Subject: [PATCH 3/7] Undo formatting changes from Google auto-formatter. --- .../beam/sdk/io/gcp/healthcare/FhirIO.java | 34 +++++++------------ 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index d875e7d27cbc..395ee95b62d2 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -94,8 +94,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// TODO(b/175212927): Remove this package and use the Beam library once FHIR Search connector is -// released. /** * {@link FhirIO} provides an API for reading and writing resources to Google Cloud Healthcare Fhir API. @@ -240,7 +238,7 @@ * */ @SuppressWarnings({ - "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) + "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) }) public class FhirIO { private static final String BASE_METRIC_PREFIX = "fhirio/"; @@ -1013,11 +1011,11 @@ public Write.Result expand(PCollection input) { input.apply( "Write nd json to GCS", ParDo.of(new WriteBundlesToFilesFn(fhirStore, tempPath, deadLetterGcsPath)) - .withOutputTags(Write.TEMP_FILES, TupleTagList.of(FAILED_BODY))); + .withOutputTags(Write.TEMP_FILES, TupleTagList.of(Write.FAILED_BODY))); PCollection> failedBodies = writeTmpFileResults - .get(FAILED_BODY) + .get(Write.FAILED_BODY) .setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); int numShards = 100; PCollection> failedFiles = @@ -1053,17 +1051,12 @@ public Write.Result expand(PCollection input) { new DoFn() { @ProcessElement public void delete(@Element Metadata path, ProcessContext context) { - // Wait til window closes for failedBodies and - // failedFiles to ensure we are - // done processing - // anything under tempGcsPath because it has been - // successfully imported to - // FHIR store or - // copies have been moved to the dead letter path. - // Clean up all of tempGcsPath. This will handle - // removing phantom temporary - // objects from - // failed / rescheduled ImportFn::importBatch. + // Wait til window closes for failedBodies and failedFiles to ensure we are + // done processing anything under tempGcsPath because it has been + // successfully imported to FHIR store or copies have been moved to the + // dead letter path. + // Clean up all of tempGcsPath. This will handle removing phantom temporary + // objects from failed / rescheduled ImportFn::importBatch. try { FileSystems.delete( Collections.singleton(path.resourceId()), @@ -1163,7 +1156,8 @@ public void addToFile(ProcessContext context, BoundedWindow window) throws IOExc + "Dropping message from batch import.", httpBody.toString(), e.getLocation().getCharOffset(), e.getMessage()); LOG.warn(resource); - context.output(FAILED_BODY, HealthcareIOError.of(httpBody, new IOException(resource))); + context.output( + Write.FAILED_BODY, HealthcareIOError.of(httpBody, new IOException(resource))); } } @@ -1259,8 +1253,7 @@ public void importBatch( client.importFhirResource( fhirStore.get(), importUri.toString(), contentStructure.name()); client.pollOperation(operation, 500L); - // Clean up temp files on GCS as they we successfully imported to FHIR store and - // no longer + // Clean up temp files on GCS as they we successfully imported to FHIR store and no longer // needed. FileSystems.delete(tempDestinations); } catch (IOException | InterruptedException e) { @@ -1273,8 +1266,7 @@ public void importBatch( FileSystems.rename(tempDestinations, deadLetterDestinations); output.output(HealthcareIOError.of(importUri.toString(), e)); } finally { - // If we've reached this point files have either been successfully import to - // FHIR store + // If we've reached this point files have either been successfully import to FHIR store // or moved to Dead Letter Queue. // Clean up original files for this batch on GCS. FileSystems.delete(ImmutableList.copyOf(batch)); From e4768a4eb497324fe0920b33f8a73d0f41417ce1 Mon Sep 17 00:00:00 2001 From: Mackenzie Clark Date: Mon, 22 Feb 2021 16:52:05 +0000 Subject: [PATCH 4/7] Adding the tuple tag check for FhirIO.Write.Result creation. --- .../beam/sdk/io/gcp/healthcare/FhirIO.java | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index 395ee95b62d2..62a3710c3568 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -601,17 +601,6 @@ public static class Result implements POutput { private final PCollection> failedBodies; private final PCollection> failedFiles; - /** - * Creates a {@link FhirIO.Write.Result} in the given {@link Pipeline}. - * - * @param pipeline the pipeline - * @param failedBodies the failed inserts - * @return the result - */ - static Result in(Pipeline pipeline, PCollection> failedBodies) { - return new Result(pipeline, null, failedBodies, null); - } - /** * Creates a {@link FhirIO.Write.Result} in the given {@link Pipeline}. * @@ -619,8 +608,16 @@ static Result in(Pipeline pipeline, PCollection> faile * @param bodies the successful and failing bodies results. * @return the result */ - static Result in(Pipeline pipeline, PCollectionTuple bodies) { - return new Result(pipeline, bodies.get(SUCCESSFUL_BODY), bodies.get(FAILED_BODY), null); + static Result in(Pipeline pipeline, PCollectionTuple bodies) throws IllegalArgumentException { + if (bodies.getAll() + .keySet() + .containsAll((Collection) TupleTagList.of(SUCCESSFUL_BODY).and(FAILED_BODY))) { + return new Result(pipeline, bodies.get(SUCCESSFUL_BODY), bodies.get(FAILED_BODY), null); + } else { + throw new IllegalArgumentException( + "The PCollection tuple bodies must have the FhirIO.Write.SUCCESSFUL_BODY " + + "and FhirIO.Write.FAILED_BODY tuple tags."); + } } static Result in( From 95201ace98b2abc273e5ed7747a7fbeb6803ed7b Mon Sep 17 00:00:00 2001 From: Mackenzie Clark Date: Mon, 22 Feb 2021 17:04:16 +0000 Subject: [PATCH 5/7] Updating the contains TupleTag check to use the PCollectionTuple .has() method, casting TupleTagList -> Collection creates an exception for some. --- .../apache/beam/sdk/io/gcp/healthcare/FhirIO.java | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index 62a3710c3568..901d6dc9563a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -402,9 +402,7 @@ public static class Result implements POutput, PInput { * @throws IllegalArgumentException the illegal argument exception */ static FhirIO.Read.Result of(PCollectionTuple pct) throws IllegalArgumentException { - if (pct.getAll() - .keySet() - .containsAll((Collection) TupleTagList.of(OUT).and(DEAD_LETTER))) { + if (pct.has(OUT) && pct.has(DEAD_LETTER)) { return new FhirIO.Read.Result(pct); } else { throw new IllegalArgumentException( @@ -609,9 +607,7 @@ public static class Result implements POutput { * @return the result */ static Result in(Pipeline pipeline, PCollectionTuple bodies) throws IllegalArgumentException { - if (bodies.getAll() - .keySet() - .containsAll((Collection) TupleTagList.of(SUCCESSFUL_BODY).and(FAILED_BODY))) { + if (bodies.has(SUCCESSFUL_BODY) && bodies.has(FAILED_BODY))) { return new Result(pipeline, bodies.get(SUCCESSFUL_BODY), bodies.get(FAILED_BODY), null); } else { throw new IllegalArgumentException( @@ -1525,9 +1521,7 @@ public static class Result implements POutput, PInput { * @throws IllegalArgumentException the illegal argument exception */ static FhirIO.Search.Result of(PCollectionTuple pct) throws IllegalArgumentException { - if (pct.getAll() - .keySet() - .containsAll((Collection) TupleTagList.of(OUT).and(DEAD_LETTER))) { + if (pct.has(OUT) && pct.has(DEAD_LETTER)) { return new FhirIO.Search.Result(pct); } else { throw new IllegalArgumentException( From 05f6aaf7fb41ba60654e12a296b9df3147309f02 Mon Sep 17 00:00:00 2001 From: Mackenzie Clark Date: Mon, 22 Feb 2021 17:49:38 +0000 Subject: [PATCH 6/7] Fix build --- .../main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index 901d6dc9563a..3ed6bc872919 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -607,7 +607,7 @@ public static class Result implements POutput { * @return the result */ static Result in(Pipeline pipeline, PCollectionTuple bodies) throws IllegalArgumentException { - if (bodies.has(SUCCESSFUL_BODY) && bodies.has(FAILED_BODY))) { + if (bodies.has(SUCCESSFUL_BODY) && bodies.has(FAILED_BODY)) { return new Result(pipeline, bodies.get(SUCCESSFUL_BODY), bodies.get(FAILED_BODY), null); } else { throw new IllegalArgumentException( From 655b29b00709c0c7be04f01f4b00e7e9f5f1b499 Mon Sep 17 00:00:00 2001 From: Mackenzie Clark Date: Mon, 22 Feb 2021 18:43:49 +0000 Subject: [PATCH 7/7] Running spotless apply --- .../main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index 3ed6bc872919..4f99435e21c9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -33,7 +33,6 @@ import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List;