Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -402,9 +401,7 @@ public static class Result implements POutput, PInput {
* @throws IllegalArgumentException the illegal argument exception
*/
static FhirIO.Read.Result of(PCollectionTuple pct) throws IllegalArgumentException {
if (pct.getAll()
.keySet()
.containsAll((Collection<?>) TupleTagList.of(OUT).and(DEAD_LETTER))) {
if (pct.has(OUT) && pct.has(DEAD_LETTER)) {
return new FhirIO.Read.Result(pct);
} else {
throw new IllegalArgumentException(
Expand Down Expand Up @@ -568,13 +565,15 @@ private String fetchResource(HealthcareApiClient client, String resourceId)
@AutoValue
public abstract static class Write extends PTransform<PCollection<String>, Write.Result> {

/** The tag for the failed writes to FHIR store`. */
/** The tag for successful writes to FHIR store. */
public static final TupleTag<String> SUCCESSFUL_BODY = new TupleTag<String>() {};
/** The tag for the failed writes to FHIR store. */
public static final TupleTag<HealthcareIOError<String>> FAILED_BODY =
new TupleTag<HealthcareIOError<String>>() {};
/** The tag for the files that failed to FHIR store`. */
/** The tag for the files that failed to FHIR store. */
public static final TupleTag<HealthcareIOError<String>> FAILED_FILES =
new TupleTag<HealthcareIOError<String>>() {};
/** The tag for temp files for import to FHIR store`. */
/** The tag for temp files for import to FHIR store. */
public static final TupleTag<ResourceId> TEMP_FILES = new TupleTag<ResourceId>() {};

/** The enum Write method. */
Expand All @@ -595,25 +594,41 @@ public enum WriteMethod {
/** The type Result. */
public static class Result implements POutput {
private final Pipeline pipeline;
private final PCollection<String> successfulBodies;
private final PCollection<HealthcareIOError<String>> failedBodies;
private final PCollection<HealthcareIOError<String>> failedFiles;

/**
* Creates a {@link FhirIO.Write.Result} in the given {@link Pipeline}. @param pipeline the
* pipeline
* Creates a {@link FhirIO.Write.Result} in the given {@link Pipeline}.
*
* @param failedBodies the failed inserts
* @param pipeline the pipeline
* @param bodies the successful and failing bodies results.
* @return the result
*/
static Result in(Pipeline pipeline, PCollection<HealthcareIOError<String>> failedBodies) {
return new Result(pipeline, failedBodies, null);
static Result in(Pipeline pipeline, PCollectionTuple bodies) throws IllegalArgumentException {
if (bodies.has(SUCCESSFUL_BODY) && bodies.has(FAILED_BODY)) {
return new Result(pipeline, bodies.get(SUCCESSFUL_BODY), bodies.get(FAILED_BODY), null);
} else {
throw new IllegalArgumentException(
"The PCollection tuple bodies must have the FhirIO.Write.SUCCESSFUL_BODY "
+ "and FhirIO.Write.FAILED_BODY tuple tags.");
}
}

static Result in(
Pipeline pipeline,
PCollection<HealthcareIOError<String>> failedBodies,
PCollection<HealthcareIOError<String>> failedFiles) {
return new Result(pipeline, failedBodies, failedFiles);
return new Result(pipeline, null, failedBodies, failedFiles);
}

/**
* Gets successful bodies from Write.
*
* @return the entries that were inserted
*/
public PCollection<String> getSuccessfulBodies() {
return this.successfulBodies;
}

/**
Expand Down Expand Up @@ -641,7 +656,13 @@ public Pipeline getPipeline() {

@Override
public Map<TupleTag<?>, PValue> expand() {
return ImmutableMap.of(Write.FAILED_BODY, failedBodies, Write.FAILED_FILES, failedFiles);
return ImmutableMap.of(
SUCCESSFUL_BODY,
successfulBodies,
FAILED_BODY,
failedBodies,
Write.FAILED_FILES,
failedFiles);
}

@Override
Expand All @@ -650,9 +671,15 @@ public void finishSpecifyingOutput(

private Result(
Pipeline pipeline,
@Nullable PCollection<String> successfulBodies,
PCollection<HealthcareIOError<String>> failedBodies,
@Nullable PCollection<HealthcareIOError<String>> failedFiles) {
this.pipeline = pipeline;
if (successfulBodies == null) {
successfulBodies =
(PCollection<String>) pipeline.apply(Create.empty(StringUtf8Coder.of()));
}
this.successfulBodies = successfulBodies;
this.failedBodies = failedBodies;
if (failedFiles == null) {
failedFiles =
Expand Down Expand Up @@ -808,7 +835,7 @@ public static Write fhirStoresImport(
* Execute Bundle Method executes a batch of requests as a single transaction @see <a
* href=https://cloud.google.com/healthcare/docs/reference/rest/v1beta1/projects.locations.datasets.fhirStores.fhir/executeBundle></a>.
*
* @param fhirStore the hl 7 v 2 store
* @param fhirStore the fhir store
* @return the write
*/
public static Write executeBundles(String fhirStore) {
Expand All @@ -835,8 +862,7 @@ public static Write executeBundles(ValueProvider<String> fhirStore) {

@Override
public Result expand(PCollection<String> input) {
PCollection<HealthcareIOError<String>> failedBundles;
PCollection<HealthcareIOError<String>> failedImports;
PCollectionTuple bundles;
switch (this.getWriteMethod()) {
case IMPORT:
LOG.warn(
Expand All @@ -854,14 +880,15 @@ public Result expand(PCollection<String> input) {
return input.apply(new Import(getFhirStore(), tempPath, deadPath, contentStructure));
case EXECUTE_BUNDLE:
default:
failedBundles =
input
.apply(
"Execute FHIR Bundles",
ParDo.of(new ExecuteBundles.ExecuteBundlesFn(this.getFhirStore())))
.setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
bundles =
input.apply(
"Execute FHIR Bundles",
ParDo.of(new ExecuteBundles.ExecuteBundlesFn(this.getFhirStore()))
.withOutputTags(SUCCESSFUL_BODY, TupleTagList.of(FAILED_BODY)));
bundles.get(SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of());
bundles.get(FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
}
return Result.in(input.getPipeline(), failedBundles);
return Result.in(input.getPipeline(), bundles);
}
}

Expand Down Expand Up @@ -963,7 +990,7 @@ Optional<ValueProvider<String>> getImportGcsDeadLetterPath() {
public Write.Result expand(PCollection<String> input) {
checkState(
input.isBounded() == IsBounded.BOUNDED,
"FhirIO.Import should only be used on unbounded PCollections as it is"
"FhirIO.Import should only be used on bounded PCollections as it is"
+ "intended for batch use only.");

// fall back on pipeline's temp location.
Expand Down Expand Up @@ -1017,13 +1044,11 @@ public Write.Result expand(PCollection<String> input) {
@ProcessElement
public void delete(@Element Metadata path, ProcessContext context) {
// Wait til window closes for failedBodies and failedFiles to ensure we are
// done processing
// anything under tempGcsPath because it has been successfully imported to
// FHIR store or
// copies have been moved to the dead letter path.
// done processing anything under tempGcsPath because it has been
// successfully imported to FHIR store or copies have been moved to the
// dead letter path.
// Clean up all of tempGcsPath. This will handle removing phantom temporary
// objects from
// failed / rescheduled ImportFn::importBatch.
// objects from failed / rescheduled ImportFn::importBatch.
try {
FileSystems.delete(
Collections.singleton(path.resourceId()),
Expand Down Expand Up @@ -1287,15 +1312,17 @@ public static class ExecuteBundles extends PTransform<PCollection<String>, Write

@Override
public FhirIO.Write.Result expand(PCollection<String> input) {
return Write.Result.in(
input.getPipeline(),
input
.apply(ParDo.of(new ExecuteBundlesFn(fhirStore)))
.setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of())));
PCollectionTuple bodies =
input.apply(
ParDo.of(new ExecuteBundlesFn(fhirStore))
.withOutputTags(Write.SUCCESSFUL_BODY, TupleTagList.of(Write.FAILED_BODY)));
bodies.get(Write.SUCCESSFUL_BODY).setCoder(StringUtf8Coder.of());
bodies.get(Write.FAILED_BODY).setCoder(HealthcareIOErrorCoder.of(StringUtf8Coder.of()));
return Write.Result.in(input.getPipeline(), bodies);
}

/** The type Write Fhir fn. */
static class ExecuteBundlesFn extends DoFn<String, HealthcareIOError<String>> {
static class ExecuteBundlesFn extends DoFn<String, String> {

private static final Counter EXECUTE_BUNDLE_ERRORS =
Metrics.counter(
Expand Down Expand Up @@ -1324,7 +1351,7 @@ static class ExecuteBundlesFn extends DoFn<String, HealthcareIOError<String>> {
/**
* Initialize healthcare client.
*
* @throws IOException the io exception
* @throws IOException If the Healthcare client cannot be created.
*/
@Setup
public void initClient() throws IOException {
Expand All @@ -1341,9 +1368,10 @@ public void executeBundles(ProcessContext context) {
client.executeFhirBundle(fhirStore.get(), body);
EXECUTE_BUNDLE_LATENCY_MS.update(Instant.now().toEpochMilli() - startTime);
EXECUTE_BUNDLE_SUCCESS.inc();
context.output(Write.SUCCESSFUL_BODY, body);
} catch (IOException | HealthcareHttpException e) {
EXECUTE_BUNDLE_ERRORS.inc();
context.output(HealthcareIOError.of(body, e));
context.output(Write.FAILED_BODY, HealthcareIOError.of(body, e));
}
}
}
Expand Down Expand Up @@ -1492,9 +1520,7 @@ public static class Result implements POutput, PInput {
* @throws IllegalArgumentException the illegal argument exception
*/
static FhirIO.Search.Result of(PCollectionTuple pct) throws IllegalArgumentException {
if (pct.getAll()
.keySet()
.containsAll((Collection<?>) TupleTagList.of(OUT).and(DEAD_LETTER))) {
if (pct.has(OUT) && pct.has(DEAD_LETTER)) {
return new FhirIO.Search.Result(pct);
} else {
throw new IllegalArgumentException(
Expand Down