Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,15 @@ private String processBreedbasePedigree(String pedigree) {
public List<BrAPIGermplasm> createBrAPIGermplasm(List<BrAPIGermplasm> postBrAPIGermplasmList, UUID programId, ImportUpload upload) {
GermplasmApi api = brAPIEndpointProvider.get(programDAO.getCoreClient(programId), GermplasmApi.class);
var program = programDAO.fetchOneById(programId);
Callable<Map<String, BrAPIGermplasm>> postFunction = null;
try {
if (!postBrAPIGermplasmList.isEmpty()) {
postFunction = () -> {
Callable<Map<String, BrAPIGermplasm>> postFunction = () -> {
List<BrAPIGermplasm> postResponse = brAPIDAOUtil.post(postBrAPIGermplasmList, upload, api::germplasmPost, importDAO::update);
return processGermplasmForDisplay(postResponse, program.getKey());
};
return programGermplasmCache.post(programId, postFunction);
}
return programGermplasmCache.post(programId, postFunction);
return new ArrayList<>();
} catch (Exception e) {
throw new InternalServerException("Unknown error has occurred: " + e.getMessage(), e);
}
Expand All @@ -311,15 +311,15 @@ public List<BrAPIGermplasm> createBrAPIGermplasm(List<BrAPIGermplasm> postBrAPIG
public List<BrAPIGermplasm> updateBrAPIGermplasm(List<BrAPIGermplasm> putBrAPIGermplasmList, UUID programId, ImportUpload upload) {
GermplasmApi api = brAPIEndpointProvider.get(programDAO.getCoreClient(programId), GermplasmApi.class);
var program = programDAO.fetchOneById(programId);
Callable<Map<String, BrAPIGermplasm>> postFunction = null;
try {
if (!putBrAPIGermplasmList.isEmpty()) {
postFunction = () -> {
Callable<Map<String, BrAPIGermplasm>> postFunction = () -> {
List<BrAPIGermplasm> putResponse = putGermplasm(putBrAPIGermplasmList, api);
return processGermplasmForDisplay(putResponse, program.getKey());
};
return programGermplasmCache.post(programId, postFunction);
}
return programGermplasmCache.post(programId, postFunction);
return new ArrayList<>();
} catch (Exception e) {
throw new InternalServerException("Unknown error has occurred: " + e.getMessage(), e);
}
Expand Down
191 changes: 154 additions & 37 deletions src/main/java/org/breedinginsight/brapi/v2/dao/BrAPIObservationDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,28 @@
*/
package org.breedinginsight.brapi.v2.dao;

import io.micronaut.context.annotation.Property;
import io.micronaut.http.server.exceptions.InternalServerException;
import io.micronaut.scheduling.annotation.Scheduled;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.brapi.client.v2.ApiResponse;
import org.brapi.client.v2.model.exceptions.ApiException;
import org.brapi.client.v2.modules.phenotype.ObservationsApi;
import org.brapi.v2.model.BrAPIAcceptedSearchResponse;
import org.brapi.v2.model.BrAPIExternalReference;
import org.brapi.v2.model.pheno.BrAPIObservation;
import org.brapi.v2.model.pheno.BrAPIObservationUnit;
import org.brapi.v2.model.pheno.request.BrAPIObservationSearchRequest;
import org.brapi.v2.model.pheno.response.BrAPIObservationListResponse;
import org.brapi.v2.model.pheno.response.BrAPIObservationSingleResponse;
import org.breedinginsight.brapps.importer.daos.ImportDAO;
import org.breedinginsight.brapps.importer.model.ImportUpload;
import org.breedinginsight.brapps.importer.services.ExternalReferenceSource;
import org.breedinginsight.daos.ProgramDAO;
import org.breedinginsight.daos.cache.ProgramCache;
import org.breedinginsight.daos.cache.ProgramCacheProvider;
import org.breedinginsight.model.Program;
import org.breedinginsight.services.brapi.BrAPIEndpointProvider;
import org.breedinginsight.utilities.BrAPIDAOUtil;
Expand All @@ -38,6 +47,8 @@
import javax.inject.Inject;
import javax.inject.Singleton;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

import static org.brapi.v2.model.BrAPIWSMIMEDataTypes.APPLICATION_JSON;

Expand All @@ -47,17 +58,115 @@ public class BrAPIObservationDAO {

private ProgramDAO programDAO;
private ImportDAO importDAO;
private BrAPIObservationUnitDAO observationUnitDAO;
private final BrAPIDAOUtil brAPIDAOUtil;
private final BrAPIEndpointProvider brAPIEndpointProvider;
private final String referenceSource;
private boolean runScheduledTasks;
private final ProgramCache<BrAPIObservation> programObservationCache;

@Inject
public BrAPIObservationDAO(ProgramDAO programDAO, ImportDAO importDAO, BrAPIDAOUtil brAPIDAOUtil, BrAPIEndpointProvider brAPIEndpointProvider) {
public BrAPIObservationDAO(ProgramDAO programDAO,
ImportDAO importDAO,
BrAPIObservationUnitDAO observationUnitDAO,
BrAPIDAOUtil brAPIDAOUtil,
BrAPIEndpointProvider brAPIEndpointProvider,
@Property(name = "brapi.server.reference-source") String referenceSource,
@Property(name = "micronaut.bi.api.run-scheduled-tasks") boolean runScheduledTasks,
ProgramCacheProvider programCacheProvider) {
this.programDAO = programDAO;
this.importDAO = importDAO;
this.observationUnitDAO = observationUnitDAO;
this.brAPIDAOUtil = brAPIDAOUtil;
this.brAPIEndpointProvider = brAPIEndpointProvider;
this.referenceSource = referenceSource;
this.runScheduledTasks = runScheduledTasks;
this.programObservationCache = programCacheProvider.getProgramCache(this::fetchProgramObservations, BrAPIObservation.class);
}

@Scheduled(initialDelay = "3s")
public void setup() {
if(!runScheduledTasks) {
return;
}
// Populate the observation cache for all programs on startup.
log.debug("populating observation cache");
List<Program> programs = programDAO.getActive();
if (programs != null) {
programObservationCache.populate(programs.stream().map(Program::getId).collect(Collectors.toList()));
}
}

/**
* Fetch formatted observations for this program.
*/
private Map<String, BrAPIObservation> fetchProgramObservations(UUID programId) throws ApiException {
ObservationsApi api = brAPIEndpointProvider.get(programDAO.getCoreClient(programId), ObservationsApi.class);
// Get the program.
List<Program> programs = programDAO.get(programId);
if (programs.size() != 1) {
throw new InternalServerException("Program was not found for given id");
}
Program program = programs.get(0);

// Set query params and make call.
BrAPIObservationSearchRequest observationSearch = new BrAPIObservationSearchRequest();
observationSearch.externalReferenceIds(List.of(programId.toString()));
observationSearch.externalReferenceSources(List.of(Utilities.generateReferenceSource(referenceSource, ExternalReferenceSource.PROGRAMS)));
return processObservationsForCache(brAPIDAOUtil.search(
api::searchObservationsPost,
api::searchObservationsSearchResultsDbIdGet,
observationSearch
), program.getKey());
}

/**
* Process a list of observations for insertion into the cache.
*/
private Map<String, BrAPIObservation> processObservationsForCache(List<BrAPIObservation> programObservations, String programKey) {
// Process programObservations in place (strip program key, etc.).
processObservations(programKey, programObservations);
// Build map.
Map<String, BrAPIObservation> programObservationsMap = new HashMap<>();
log.trace("processing observationUnits for cache: " + programObservations);
for (BrAPIObservation observation: programObservations) {
BrAPIExternalReference xref = observation
.getExternalReferences()
.stream()
.filter(reference -> String.format("%s/%s", referenceSource, ExternalReferenceSource.OBSERVATIONS.getName()).equals(reference.getReferenceSource()))
.findFirst().orElseThrow(() -> new IllegalStateException("No BI external reference found"));
programObservationsMap.put(xref.getReferenceId(), observation);
}
return programObservationsMap;
}

/**
* Process BrAPIObservations for use in DeltaBreed (e.g. strip program key).
*/
private void processObservations(String programKey, List<BrAPIObservation> observations) {
for (BrAPIObservation obs: observations) {
// Strip program key from observationVariableName.
if (StringUtils.isNotBlank(obs.getObservationVariableName())) {
obs.setObservationVariableName(Utilities.removeProgramKey(obs.getObservationVariableName(), programKey));
}
// Strip program key and unknown info from germplasmName and observationUnitName.
if (StringUtils.isNotBlank(obs.getGermplasmName())) {
obs.setGermplasmName(Utilities.removeProgramKeyAndUnknownAdditionalData(obs.getGermplasmName(), programKey));
}
if (StringUtils.isNotBlank(obs.getObservationUnitName())) {
obs.setObservationUnitName(Utilities.removeProgramKeyAndUnknownAdditionalData(obs.getObservationUnitName(), programKey));
}
}
}

/**
* Get all observations for a program from the cache.
*/
private Map<String, BrAPIObservation> getProgramObservations(UUID programId) throws ApiException {
return programObservationCache.get(programId);
}

// Note: not using cache, because unique studyName (with "[ProgramKey-ExtraInfo]") is not stored directly on Observation.
public List<BrAPIObservation> getObservationsByStudyName(List<String> studyNames, Program program) throws ApiException {
if(studyNames.isEmpty()) {
return Collections.emptyList();
Expand All @@ -78,33 +187,22 @@ public List<BrAPIObservation> getObservationsByTrialDbId(List<String> trialDbIds
if(trialDbIds.isEmpty()) {
return Collections.emptyList();
}

BrAPIObservationSearchRequest observationSearchRequest = new BrAPIObservationSearchRequest();
observationSearchRequest.setProgramDbIds(List.of(program.getBrapiProgram().getProgramDbId()));
observationSearchRequest.setTrialDbIds(new ArrayList<>(trialDbIds));
ObservationsApi api = brAPIEndpointProvider.get(programDAO.getCoreClient(program.getId()), ObservationsApi.class);
return brAPIDAOUtil.search(
api::searchObservationsPost,
(brAPIWSMIMEDataTypes, searchResultsDbId, page, pageSize) -> searchObservationsSearchResultsDbIdGet(program.getId(), searchResultsDbId, page, pageSize),
observationSearchRequest
);
// First, get all ObservationUnits for the given trialDbIds.
List<String> observationUnitDbIds = observationUnitDAO.getObservationUnitsForTrialDbIds(program.getId(), trialDbIds)
.stream().map(BrAPIObservationUnit::getObservationUnitDbId).collect(Collectors.toList());
// Finally, return all Observations for those ObservationUnits (Observations are linked to Trial through ObservationUnits).
return getProgramObservations(program.getId()).values().stream()
.filter(o -> observationUnitDbIds.contains(o.getObservationUnitDbId()))
.collect(Collectors.toList());
}

public List<BrAPIObservation> getObservationsByObservationUnitsAndVariables(Collection<String> ouDbIds, Collection<String> variableDbIds, Program program) throws ApiException {
if(ouDbIds.isEmpty() || variableDbIds.isEmpty()) {
return Collections.emptyList();
}

BrAPIObservationSearchRequest observationSearchRequest = new BrAPIObservationSearchRequest();
observationSearchRequest.setProgramDbIds(List.of(program.getBrapiProgram().getProgramDbId()));
observationSearchRequest.setObservationUnitDbIds(new ArrayList<>(ouDbIds));
observationSearchRequest.setObservationVariableDbIds(new ArrayList<>(variableDbIds));
ObservationsApi api = brAPIEndpointProvider.get(programDAO.getCoreClient(program.getId()), ObservationsApi.class);
return brAPIDAOUtil.search(
api::searchObservationsPost,
(brAPIWSMIMEDataTypes, searchResultsDbId, page, pageSize) -> searchObservationsSearchResultsDbIdGet(program.getId(), searchResultsDbId, page, pageSize),
observationSearchRequest
);
return getProgramObservations(program.getId()).values().stream()
.filter(o -> ouDbIds.contains(o.getObservationUnitDbId()) && variableDbIds.contains(o.getObservationVariableDbId()))
.collect(Collectors.toList());
}

@NotNull
Expand All @@ -115,28 +213,47 @@ private ApiResponse<Pair<Optional<BrAPIObservationListResponse>, Optional<BrAPIA

public List<BrAPIObservation> createBrAPIObservations(List<BrAPIObservation> brAPIObservationList, UUID programId, ImportUpload upload) throws ApiException {
ObservationsApi api = brAPIEndpointProvider.get(programDAO.getCoreClient(programId), ObservationsApi.class);
return brAPIDAOUtil.post(brAPIObservationList, upload, api::observationsPost, importDAO::update);
var program = programDAO.fetchOneById(programId);
try {
if (!brAPIObservationList.isEmpty()) {
Callable<Map<String, BrAPIObservation>> postFunction = () -> {
List<BrAPIObservation> postResponse = brAPIDAOUtil.post(brAPIObservationList, upload, api::observationsPost, importDAO::update);
return processObservationsForCache(postResponse, program.getKey());
};
return programObservationCache.post(programId, postFunction);
}
return new ArrayList<>();
} catch (Exception e) {
throw new InternalServerException("Unknown error has occurred: " + e.getMessage(), e);
}
}

public BrAPIObservation updateBrAPIObservation(String dbId, BrAPIObservation observation, UUID programId) throws ApiException {
ObservationsApi api = brAPIEndpointProvider.get(programDAO.getCoreClient(programId), ObservationsApi.class);
ApiResponse<BrAPIObservationSingleResponse> response;
BrAPIObservation updatedObservation = null;
var program = programDAO.fetchOneById(programId);
try {
response = api.observationsObservationDbIdPut(dbId, observation);
if (response != null) {
BrAPIObservationSingleResponse body = response.getBody();
if (body == null) {
throw new ApiException("Response is missing body", 0, response.getHeaders(), null);
}
updatedObservation = body.getResult();
if (updatedObservation == null) {
throw new ApiException("Response body is missing result", 0, response.getHeaders(), response.getBody().toString());
}
}
Callable<Map<String, BrAPIObservation>> postFunction = () -> {
ApiResponse<BrAPIObservationSingleResponse> response = api.observationsObservationDbIdPut(dbId, observation);
if (response == null)
{
throw new ApiException("Response is null", 0, null, null);
}
BrAPIObservationSingleResponse body = response.getBody();
if (body == null) {
throw new ApiException("Response is missing body", 0, response.getHeaders(), null);
}
BrAPIObservation updatedObservation = body.getResult();
if (updatedObservation == null) {
throw new ApiException("Response body is missing result", 0, response.getHeaders(), response.getBody().toString());
}
return processObservationsForCache(List.of(updatedObservation), program.getKey());
};
return programObservationCache.post(programId, postFunction).get(0);
} catch (ApiException e) {
log.error(Utilities.generateApiExceptionLogMessage(e));
throw new InternalServerException("Unknown error has occurred: " + e.getMessage(), e);
} catch (Exception e) {
throw new InternalServerException("Unknown error has occurred: " + e.getMessage(), e);
}
return updatedObservation;
}
}
Loading