Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ jobs:
docker run --volumes-from mms_test_configs --network container:mms -t postman/newman run permissions.postman_collection.json -e test-env.json --delay-request 500
docker run --volumes-from mms_test_configs --network container:mms -t postman/newman run search.postman_collection.json -e test-env.json --delay-request 1000
docker run --volumes-from mms_test_configs --network container:mms -t postman/newman run artifacts.postman_collection.json -e test-env.json --delay-request 500
docker run --volumes-from mms_test_configs --network container:mms -t postman/newman run elastic.postman_collection.json -e test-env.json --delay-request 500

- persist_to_workspace:
root: /home/circleci/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;

import org.openmbee.mms.json.BaseJson;
import org.openmbee.mms.json.ElementJson;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,18 @@ public MMSException() {
}

public MMSException(HttpStatus code, Object messageObject) {
if (messageObject instanceof Throwable) {
super.initCause((Throwable)messageObject);
}
this.code = code;
this.messageObject = messageObject;

}

public MMSException(int code, Object messageObject) {
if (messageObject instanceof Throwable) {
super.initCause((Throwable)messageObject);
}
this.code = HttpStatus.resolve(code);
this.messageObject = messageObject;
}
Expand All @@ -36,4 +43,12 @@ public void setMessageObject(Object messageObject) {
this.messageObject = messageObject;
}

@Override
public String getMessage() {
if (messageObject == null) {
return super.getMessage();
}
return (messageObject instanceof Throwable) ?
((Throwable)messageObject).getMessage() : messageObject.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public RefJson createBranch(String projectId, RefJson branch) {
for (Node n: nodeRepository.findAllByDeleted(false)) {
docIds.add(n.getDocId());
}
nodeIndex.addToRef(docIds);
try { nodeIndex.addToRef(docIds); } catch(Exception e) {}
eventPublisher.forEach((pub) -> pub.publish(
EventObject.create(projectId, branch.getId(), "branch_created", branch)));
return branch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.openmbee.mms.core.exceptions.BadRequestException;
import org.openmbee.mms.core.exceptions.InternalErrorException;
import org.openmbee.mms.core.objects.ElementsCommitResponse;
import org.openmbee.mms.core.objects.EventObject;
import org.openmbee.mms.core.services.EventService;
Expand All @@ -22,7 +23,6 @@
import org.openmbee.mms.core.config.ContextHolder;
import org.openmbee.mms.core.objects.ElementsRequest;
import org.openmbee.mms.core.objects.ElementsResponse;
import org.openmbee.mms.core.exceptions.InternalErrorException;
import org.openmbee.mms.data.domains.scoped.Commit;
import org.openmbee.mms.data.domains.scoped.CommitType;
import org.openmbee.mms.data.domains.scoped.Node;
Expand All @@ -37,10 +37,6 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;


@Service("defaultNodeService")
public class DefaultNodeService implements NodeService {
Expand Down Expand Up @@ -144,6 +140,7 @@ public void readAsStream(String projectId, String refId,
} else {
stream.write("\n".getBytes(StandardCharsets.UTF_8));
}
stream.close();
}

@Override
Expand Down Expand Up @@ -207,23 +204,18 @@ public ElementsCommitResponse createOrUpdate(String projectId, String refId, Ele
return response;
}

//@Transactional
protected void commitChanges(NodeChangeInfo info) {
//TODO: Test rollback on IndexDAO failure
TransactionDefinition def = new DefaultTransactionDefinition();
TransactionStatus status = this.nodeRepository.getTransactionManager().getTransaction(def);

public void commitChanges(NodeChangeInfo info) {
Map<String, Node> nodes = info.getToSaveNodeMap();
Map<String, ElementJson> json = info.getUpdatedMap();
CommitJson cmjs = info.getCommitJson();
Instant now = info.getNow();
if (!nodes.isEmpty()) {
try {
this.nodeRepository.saveAll(new ArrayList<>(nodes.values()));
if (json != null && !json.isEmpty()) {
this.nodeIndex.indexAll(json.values());
}
this.nodeIndex.removeFromRef(info.getOldDocIds());
try { this.nodeIndex.removeFromRef(info.getOldDocIds()); } catch(Exception e) {}
this.commitIndex.index(cmjs);

Optional<Commit> existing = this.commitRepository.findByCommitId(cmjs.getId());
existing.ifPresentOrElse(
Expand All @@ -240,13 +232,10 @@ protected void commitChanges(NodeChangeInfo info) {
commit.setComment(cmjs.getComment());
this.commitRepository.save(commit);
});
this.commitIndex.index(cmjs);

this.nodeRepository.getTransactionManager().commit(status);
this.nodeRepository.saveAll(new ArrayList<>(nodes.values()));
} catch (Exception e) {
logger.error("commitChanges error: ", e);
this.nodeRepository.getTransactionManager().rollback(status);
throw new InternalErrorException("Error committing transaction");
logger.error("commitChanges error: {}", e.getMessage());
throw new InternalErrorException("Error committing changes: " + e.getMessage());
}
eventPublisher.forEach((pub) -> pub.publish(
EventObject.create(cmjs.getProjectId(), cmjs.getRefId(), "commit", cmjs)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
Expand All @@ -30,6 +29,7 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.openmbee.mms.core.exceptions.InternalErrorException;
import org.openmbee.mms.elastic.utils.Index;
import org.openmbee.mms.json.BaseJson;
import org.slf4j.Logger;
Expand Down Expand Up @@ -79,7 +79,8 @@ public void deleteById(String index, String docId) {
try {
client.delete(new DeleteRequest(index, docId), REQUEST_OPTIONS);
} catch (IOException e) {
throw new RuntimeException(e);
logger.error(e.getMessage(), e);
throw new InternalErrorException(e);
}
}

Expand All @@ -91,7 +92,8 @@ public void deleteAll(String index, Collection<? extends BaseJson> jsons) {
}
client.bulk(bulkIndex, REQUEST_OPTIONS);
} catch (IOException e) {
throw new RuntimeException(e);
logger.error(e.getMessage(), e);
throw new InternalErrorException(e);
}
}

Expand All @@ -102,7 +104,8 @@ public boolean existsById(String index, String docId) {
getRequest.storedFields("_none_");
return client.exists(getRequest, REQUEST_OPTIONS);
} catch (IOException e) {
throw new RuntimeException(e);
logger.error(e.getMessage(), e);
throw new InternalErrorException(e);
}
}

Expand All @@ -118,7 +121,8 @@ public Optional<E> findById(String index, String docId) {
return Optional.empty();
}
} catch (IOException e) {
throw new RuntimeException(e);
logger.error(e.getMessage(), e);
throw new InternalErrorException(e);
}
}

Expand All @@ -145,7 +149,8 @@ public List<E> findAllById(String index, Set<String> docIds) {
}
return listOfResponses;
} catch (IOException e) {
throw new RuntimeException(e);
logger.error(e.getMessage(), e);
throw new InternalErrorException(e);
}
}

Expand All @@ -169,21 +174,15 @@ public void indexAll(String index, Collection<? extends BaseJson> jsons) {
for (BaseJson json : jsons) {
bulkProcessor.add(new IndexRequest(index).id(json.getDocId()).source(json));
}
try {
if (!bulkProcessor.awaitClose(1200L, TimeUnit.SECONDS)) {
logger.error("Timed out in bulk processing");
}
} catch (InterruptedException e) {
logger.error("Index all interrupted: ", e);
}

bulkProcessor.close();
}

public void index(String index, BaseJson<?> json) {
try {
client.index(new IndexRequest(index).id(json.getDocId()).source(json), REQUEST_OPTIONS);
} catch (IOException e) {
throw new RuntimeException(e);
logger.error(e.getMessage(), e);
throw new InternalErrorException(e);
}
}

Expand All @@ -203,42 +202,26 @@ public E update(String index, BaseJson json) {
}
}
} catch (IOException e) {
throw new RuntimeException(e);
logger.error(e.getMessage(), e);
throw new InternalErrorException(e);
}
return response;
}

protected BulkProcessor getBulkProcessor(RestHighLevelClient client) {
return getBulkProcessor(client, null);
}

private BulkProcessor getBulkProcessor(RestHighLevelClient client, BulkProcessor.Listener listener) {
if (listener == null) {
listener = new BulkProcessor.Listener() {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}

@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
if (response.hasFailures()) {
response.iterator().forEachRemaining(action -> {
if (action.isFailed()) {
logger.error("Error in bulk processing: {}", action.getFailureMessage());
}
});
}
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("Error in bulk processing: ", failure);
BulkProcessor.Builder bpBuilder = BulkProcessor.builder((request, bulkListener) -> {
try {
BulkResponse response = client.bulk(request, RequestOptions.DEFAULT);
if (response.hasFailures()) {
String failure = response.buildFailureMessage();
logger.error("Bulk response error: {}", failure);
throw new InternalErrorException(failure);
}
};
}
BulkProcessor.Builder bpBuilder = BulkProcessor.builder((request, bulkListener) -> client
.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener);
} catch (IOException ioe) {
logger.error(ioe.getMessage(), ioe);
throw new InternalErrorException(ioe);
}
}, getListener());
bpBuilder.setBulkActions(bulkLimit);
bpBuilder.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB));
bpBuilder.setConcurrentRequests(1);
Expand All @@ -247,4 +230,22 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)

return bpBuilder.build();
}

private BulkProcessor.Listener getListener() {
return new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
}

@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {

}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
throw new InternalErrorException(failure);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.openmbee.mms.core.dao.CommitIndexDAO;
import org.openmbee.mms.core.exceptions.InternalErrorException;
import org.openmbee.mms.elastic.utils.Index;
import org.openmbee.mms.json.BaseJson;
import org.openmbee.mms.json.CommitJson;
Expand Down Expand Up @@ -156,7 +157,8 @@ public List<CommitJson> elementHistory(String nodeId, Set<String> commitIds) {
}
return commits;
} catch (IOException e) {
throw new RuntimeException(e);
logger.error(e.getMessage(), e);
throw new InternalErrorException(e);
}
}

Expand Down Expand Up @@ -186,7 +188,8 @@ private List<CommitJson> getDocs(String commitId) {
}
return rawCommits;
} catch (IOException ioe) {
throw new RuntimeException(ioe);
logger.error(ioe.getMessage(), ioe);
throw new InternalErrorException(ioe);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
Expand All @@ -23,6 +22,7 @@
import org.elasticsearch.search.sort.SortOrder;
import org.openmbee.mms.core.config.ContextHolder;
import org.openmbee.mms.core.dao.NodeIndexDAO;
import org.openmbee.mms.core.exceptions.InternalErrorException;
import org.openmbee.mms.elastic.utils.Index;
import org.openmbee.mms.json.BaseJson;
import org.openmbee.mms.json.ElementJson;
Expand Down Expand Up @@ -82,7 +82,8 @@ public Optional<ElementJson> getByCommitId(String commitId, String nodeId) {
ob.putAll(searchResponse.getHits().getAt(0).getSourceAsMap());
return Optional.of(ob);
} catch (IOException e) {
throw new RuntimeException(e);
logger.error(e.getMessage(), e);
throw new InternalErrorException(e);
}
}

Expand Down Expand Up @@ -112,13 +113,7 @@ private void bulkUpdateRefWithScript(Set<String> docIds, String script) {
request.script(inline);
bulkProcessor.add(request);
}
try {
if (!bulkProcessor.awaitClose(1200L, TimeUnit.SECONDS)) {
logger.error("Timed out in bulk processing");
}
} catch (InterruptedException e) {
logger.error("Index all interrupted: ", e);
}
bulkProcessor.close();
}

@Override
Expand Down Expand Up @@ -150,7 +145,8 @@ public Optional<ElementJson> getElementLessThanOrEqualTimestamp(String nodeId,
}
count += this.termLimit;
} catch (IOException e) {
throw new RuntimeException(e);
logger.error(e.getMessage(), e);
throw new InternalErrorException(e);
}
}
return Optional.empty();
Expand Down
Loading