diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java index 359ebf420fbd..4f99435e21c9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/FhirIO.java @@ -33,7 +33,6 @@ import java.nio.charset.StandardCharsets; import java.time.Instant; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -402,9 +401,7 @@ public static class Result implements POutput, PInput { * @throws IllegalArgumentException the illegal argument exception */ static FhirIO.Read.Result of(PCollectionTuple pct) throws IllegalArgumentException { - if (pct.getAll() - .keySet() - .containsAll((Collection) TupleTagList.of(OUT).and(DEAD_LETTER))) { + if (pct.has(OUT) && pct.has(DEAD_LETTER)) { return new FhirIO.Read.Result(pct); } else { throw new IllegalArgumentException( @@ -568,13 +565,15 @@ private String fetchResource(HealthcareApiClient client, String resourceId) @AutoValue public abstract static class Write extends PTransform, Write.Result> { - /** The tag for the failed writes to FHIR store`. */ + /** The tag for successful writes to FHIR store. */ + public static final TupleTag SUCCESSFUL_BODY = new TupleTag() {}; + /** The tag for the failed writes to FHIR store. */ public static final TupleTag> FAILED_BODY = new TupleTag>() {}; - /** The tag for the files that failed to FHIR store`. */ + /** The tag for the files that failed to FHIR store. */ public static final TupleTag> FAILED_FILES = new TupleTag>() {}; - /** The tag for temp files for import to FHIR store`. */ + /** The tag for temp files for import to FHIR store. */ public static final TupleTag TEMP_FILES = new TupleTag() {}; /** The enum Write method. */ @@ -595,25 +594,41 @@ public enum WriteMethod { /** The type Result. */ public static class Result implements POutput { private final Pipeline pipeline; + private final PCollection successfulBodies; private final PCollection> failedBodies; private final PCollection> failedFiles; /** - * Creates a {@link FhirIO.Write.Result} in the given {@link Pipeline}. @param pipeline the - * pipeline + * Creates a {@link FhirIO.Write.Result} in the given {@link Pipeline}. * - * @param failedBodies the failed inserts + * @param pipeline the pipeline + * @param bodies the successful and failing bodies results. * @return the result */ - static Result in(Pipeline pipeline, PCollection> failedBodies) { - return new Result(pipeline, failedBodies, null); + static Result in(Pipeline pipeline, PCollectionTuple bodies) throws IllegalArgumentException { + if (bodies.has(SUCCESSFUL_BODY) && bodies.has(FAILED_BODY)) { + return new Result(pipeline, bodies.get(SUCCESSFUL_BODY), bodies.get(FAILED_BODY), null); + } else { + throw new IllegalArgumentException( + "The PCollection tuple bodies must have the FhirIO.Write.SUCCESSFUL_BODY " + + "and FhirIO.Write.FAILED_BODY tuple tags."); + } } static Result in( Pipeline pipeline, PCollection> failedBodies, PCollection> failedFiles) { - return new Result(pipeline, failedBodies, failedFiles); + return new Result(pipeline, null, failedBodies, failedFiles); + } + + /** + * Gets successful bodies from Write. + * + * @return the entries that were inserted + */ + public PCollection getSuccessfulBodies() { + return this.successfulBodies; } /** @@ -641,7 +656,13 @@ public Pipeline getPipeline() { @Override public Map, PValue> expand() { - return ImmutableMap.of(Write.FAILED_BODY, failedBodies, Write.FAILED_FILES, failedFiles); + return ImmutableMap.of( + SUCCESSFUL_BODY, + successfulBodies, + FAILED_BODY, + failedBodies, + Write.FAILED_FILES, + failedFiles); } @Override @@ -650,9 +671,15 @@ public void finishSpecifyingOutput( private Result( Pipeline pipeline, + @Nullable PCollection successfulBodies, PCollection> failedBodies, @Nullable PCollection> failedFiles) { this.pipeline = pipeline; + if (successfulBodies == null) { + successfulBodies = + (PCollection) pipeline.apply(Create.empty(StringUtf8Coder.of())); + } + this.successfulBodies = successfulBodies; this.failedBodies = failedBodies; if (failedFiles == null) { failedFiles = @@ -808,7 +835,7 @@ public static Write fhirStoresImport( * Execute Bundle Method executes a batch of requests as a single transaction @see . * - * @param fhirStore the hl 7 v 2 store + * @param fhirStore the fhir store * @return the write */ public static Write executeBundles(String fhirStore) { @@ -835,8 +862,7 @@ public static Write executeBundles(ValueProvider fhirStore) { @Override public Result expand(PCollection input) { - PCollection> failedBundles; - PCollection> failedImports; + PCollectionTuple bundles; switch (this.getWriteMethod()) { case IMPORT: LOG.warn( @@ -854,14 +880,15 @@ public Result expand(PCollection input) { return input.apply(new Import(getFhirStore(), tempPath, deadPath, contentStructure)); case EXECUTE_BUNDLE: default: - failedBundles = - input - .apply( - "Execute FHIR Bundles", - ParDo.of(new ExecuteBundles.ExecuteBundlesFn(this.getFhirStore()))) - .setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); + bundles = + input.apply( + "Execute FHIR Bundles", + ParDo.of(new ExecuteBundles.ExecuteBundlesFn(this.getFhirStore())) + .withOutputTags(SUCCESSFUL_BODY, TupleTagList.of(FAILED_BODY))); + bundles.get(SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of()); + bundles.get(FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); } - return Result.in(input.getPipeline(), failedBundles); + return Result.in(input.getPipeline(), bundles); } } @@ -963,7 +990,7 @@ Optional> getImportGcsDeadLetterPath() { public Write.Result expand(PCollection input) { checkState( input.isBounded() == IsBounded.BOUNDED, - "FhirIO.Import should only be used on unbounded PCollections as it is" + "FhirIO.Import should only be used on bounded PCollections as it is" + "intended for batch use only."); // fall back on pipeline's temp location. @@ -1017,13 +1044,11 @@ public Write.Result expand(PCollection input) { @ProcessElement public void delete(@Element Metadata path, ProcessContext context) { // Wait til window closes for failedBodies and failedFiles to ensure we are - // done processing - // anything under tempGcsPath because it has been successfully imported to - // FHIR store or - // copies have been moved to the dead letter path. + // done processing anything under tempGcsPath because it has been + // successfully imported to FHIR store or copies have been moved to the + // dead letter path. // Clean up all of tempGcsPath. This will handle removing phantom temporary - // objects from - // failed / rescheduled ImportFn::importBatch. + // objects from failed / rescheduled ImportFn::importBatch. try { FileSystems.delete( Collections.singleton(path.resourceId()), @@ -1287,15 +1312,17 @@ public static class ExecuteBundles extends PTransform, Write @Override public FhirIO.Write.Result expand(PCollection input) { - return Write.Result.in( - input.getPipeline(), - input - .apply(ParDo.of(new ExecuteBundlesFn(fhirStore))) - .setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()))); + PCollectionTuple bodies = + input.apply( + ParDo.of(new ExecuteBundlesFn(fhirStore)) + .withOutputTags(Write.SUCCESSFUL_BODY, TupleTagList.of(Write.FAILED_BODY))); + bodies.get(Write.SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of()); + bodies.get(Write.FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())); + return Write.Result.in(input.getPipeline(), bodies); } /** The type Write Fhir fn. */ - static class ExecuteBundlesFn extends DoFn> { + static class ExecuteBundlesFn extends DoFn { private static final Counter EXECUTE_BUNDLE_ERRORS = Metrics.counter( @@ -1324,7 +1351,7 @@ static class ExecuteBundlesFn extends DoFn> { /** * Initialize healthcare client. * - * @throws IOException the io exception + * @throws IOException If the Healthcare client cannot be created. */ @Setup public void initClient() throws IOException { @@ -1341,9 +1368,10 @@ public void executeBundles(ProcessContext context) { client.executeFhirBundle(fhirStore.get(), body); EXECUTE_BUNDLE_LATENCY_MS.update(Instant.now().toEpochMilli() - startTime); EXECUTE_BUNDLE_SUCCESS.inc(); + context.output(Write.SUCCESSFUL_BODY, body); } catch (IOException | HealthcareHttpException e) { EXECUTE_BUNDLE_ERRORS.inc(); - context.output(HealthcareIOError.of(body, e)); + context.output(Write.FAILED_BODY, HealthcareIOError.of(body, e)); } } } @@ -1492,9 +1520,7 @@ public static class Result implements POutput, PInput { * @throws IllegalArgumentException the illegal argument exception */ static FhirIO.Search.Result of(PCollectionTuple pct) throws IllegalArgumentException { - if (pct.getAll() - .keySet() - .containsAll((Collection) TupleTagList.of(OUT).and(DEAD_LETTER))) { + if (pct.has(OUT) && pct.has(DEAD_LETTER)) { return new FhirIO.Search.Result(pct); } else { throw new IllegalArgumentException(