Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public JwtAuthenticationResponse createAuthenticationToken(

@GetMapping(value = "/authentication")
@PreAuthorize("isAuthenticated()")
public JwtAuthenticationResponse createAuthenticationToken() {
public JwtAuthenticationResponse getAuthenticationToken() {
final Authentication authentication = SecurityContextHolder.getContext().getAuthentication();
if (authentication != null && authentication.getPrincipal() instanceof UserDetails) {
final UserDetails userDetails = (UserDetails) authentication.getPrincipal();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.openmbee.mms.core.services;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;

import org.openmbee.mms.core.objects.ElementsRequest;
Expand All @@ -9,6 +11,8 @@

public interface NodeService {

void readAsStream(String projectId, String refId, Map<String, String> params, OutputStream output, String accept) throws IOException;

ElementsResponse read(String projectId, String refId, String id, Map<String, String> params);

ElementsResponse read(String projectId, String refId, ElementsRequest req, Map<String, String> params);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.openmbee.mms.crud.config;

import java.util.Map;
import org.openmbee.mms.core.exceptions.SdvcException;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
Expand All @@ -16,6 +17,9 @@ public class ExceptionHandlerConfig extends ResponseEntityExceptionHandler {

@ExceptionHandler(value = { SdvcException.class })
protected ResponseEntity<Object> handleSdvcException(SdvcException ex, WebRequest request) {
if (ex.getMessageObject() instanceof String) {
ex.setMessageObject(Map.of("message", ex.getMessageObject()));
}
return handleExceptionInternal(ex, ex.getMessageObject(), new HttpHeaders(), ex.getCode(),
request);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
package org.openmbee.mms.crud.controllers.elements;

import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;

import java.util.Map;

import org.openmbee.mms.core.objects.ElementsRequest;
import org.openmbee.mms.core.objects.ElementsResponse;
import org.openmbee.mms.core.services.CommitService;
import org.openmbee.mms.crud.controllers.BaseController;
import org.openmbee.mms.core.exceptions.BadRequestException;
import org.openmbee.mms.core.services.NodeService;
import org.openmbee.mms.core.pubsub.EmbeddedHookService;
import org.openmbee.mms.crud.hooks.ElementUpdateHook;
import org.openmbee.mms.crud.services.DefaultCommitService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.core.Authentication;
import org.springframework.web.bind.annotation.DeleteMapping;
Expand All @@ -21,32 +30,51 @@
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;

@RestController
@RequestMapping("/projects/{projectId}/refs/{refId}/elements")
@Tag(name = "Elements")
public class ElementsController extends BaseController {

private EmbeddedHookService embeddedHookService;
private CommitService commitService;

@Autowired
public void setEmbeddedHookService(EmbeddedHookService embeddedHookService) {
this.embeddedHookService = embeddedHookService;
}

@Autowired
public void setCommitService(@Qualifier("defaultCommitService")CommitService commitService) {
this.commitService = commitService;
}

@GetMapping
@PreAuthorize("@mss.hasBranchPrivilege(authentication, #projectId, #refId, 'BRANCH_READ', true)")
public ElementsResponse getAllElements(
@ApiResponse(responseCode = "200", content = {
@Content(mediaType = "application/json", schema = @Schema(implementation = ElementsResponse.class)),
@Content(mediaType = "application/x-ndjson")
})
public ResponseEntity<StreamingResponseBody> getAllElements(
@PathVariable String projectId,
@PathVariable String refId,
@RequestParam(required = false) String commitId,
@RequestParam(required = false) Map<String, String> params) {
@RequestParam(required = false) Map<String, String> params,
@Parameter(hidden = true) @RequestHeader(value = "Accept", defaultValue = "application/json") String accept) {

NodeService nodeService = getNodeService(projectId);
return nodeService.read(projectId, refId, "", params);
if (commitId != null && !commitId.isEmpty()) {
commitService.getCommit(projectId, commitId); //check commit exists
}
StreamingResponseBody stream = outputStream -> nodeService.readAsStream(projectId, refId, params, outputStream, accept);
return ResponseEntity.ok()
.header("Content-Type", accept.equals("application/x-ndjson") ? accept : "application/json")
.body(stream);
}

@GetMapping(value = "/{elementId}", produces = MediaType.APPLICATION_JSON_VALUE)
Expand All @@ -58,7 +86,6 @@ public ElementsResponse getElement(
@RequestParam(required = false) String commitId,
@RequestParam(required = false) Map<String, String> params) {


NodeService nodeService = getNodeService(projectId);
ElementsResponse res = nodeService.read(projectId, refId, elementId, params);
handleSingleResponse(res);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,12 @@ public CommitsResponse getRefCommits(String projectId, String refId, Map<String,
public CommitsResponse getCommit(String projectId, String commitId) {
ContextHolder.setContext(projectId);
CommitsResponse res = new CommitsResponse();
try {
Optional<CommitJson> commit = commitIndex.findById(commitId);
if (commit.isPresent()) {
res.getCommits().add(commit.get());
} else {
throw new NotFoundException(res);
}
} catch (Exception e) {
e.printStackTrace();
throw new InternalErrorException(e);

Optional<CommitJson> commit = commitIndex.findById(commitId);
if (commit.isPresent()) {
res.getCommits().add(commit.get());
} else {
throw new NotFoundException("Commit not found");
}
return res;
}
Expand All @@ -111,21 +107,16 @@ public CommitsResponse getCommit(String projectId, String commitId) {
public CommitsResponse getElementCommits(String projectId, String refId, String elementId, Map<String, String> params) {
ContextHolder.setContext(projectId);
CommitsResponse res = new CommitsResponse();
try {
Optional<Branch> ref = branchRepository.findByBranchId(refId);
if (!ref.isPresent()) {
throw new NotFoundException("Branch not found");
}
List<Commit> refCommits = commitRepository.findByRefAndTimestampAndLimit(ref.get(), null, 0);
Set<String> commitIds = new HashSet<>();
for (Commit commit: refCommits) {
commitIds.add(commit.getDocId());
}
res.getCommits().addAll(commitIndex.elementHistory(elementId, commitIds));
} catch (Exception e) {
e.printStackTrace();
throw new InternalErrorException(e);
Optional<Branch> ref = branchRepository.findByBranchId(refId);
if (!ref.isPresent()) {
throw new NotFoundException("Branch not found");
}
List<Commit> refCommits = commitRepository.findByRefAndTimestampAndLimit(ref.get(), null, 0);
Set<String> commitIds = new HashSet<>();
for (Commit commit: refCommits) {
commitIds.add(commit.getDocId());
}
res.getCommits().addAll(commitIndex.elementHistory(elementId, commitIds));
return res;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
package org.openmbee.mms.crud.services;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.*;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.openmbee.mms.core.exceptions.BadRequestException;
import org.openmbee.mms.core.objects.EventObject;
import org.openmbee.mms.core.services.EventService;
import org.openmbee.mms.core.services.NodeChangeInfo;
Expand All @@ -24,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
Expand All @@ -33,6 +44,10 @@
@Service("defaultNodeService")
public class DefaultNodeService implements NodeService {

@Value("${mms.stream.batch.size:100000}")
protected int streamLimit;
protected final ObjectMapper objectMapper = new ObjectMapper();

protected final Logger logger = LoggerFactory.getLogger(getClass());

protected NodeDAO nodeRepository;
Expand Down Expand Up @@ -86,6 +101,50 @@ public void setEventPublisher(Collection<EventService> eventPublisher) {
this.eventPublisher = eventPublisher;
}

@Override
public void readAsStream(String projectId, String refId,
Map<String, String> params, OutputStream stream, String accept) throws IOException {

String commitId = params.getOrDefault("commitId", null);
ContextHolder.setContext(projectId, refId);
List<Node> nodes;
if (commitId != null && !commitId.isEmpty()) {
if (!commitRepository.findByCommitId(commitId).isPresent()) {
throw new BadRequestException("commit id is invalid");
}
nodes = nodeRepository.findAll();
} else {
nodes = nodeRepository.findAllByDeleted(false);
}
String separator = "\n";
if (!"application/x-ndjson".equals(accept)) {
stream.write("{\"elements\":[".getBytes(StandardCharsets.UTF_8));
separator = ",";
}
final String sep = separator;
AtomicInteger counter = new AtomicInteger();
batches(nodes, streamLimit).forEach(ns -> {
try {
if (counter.get() == 0) {
counter.getAndIncrement();
} else {
stream.write(sep.getBytes(StandardCharsets.UTF_8));
}
Collection<ElementJson> result = nodeGetHelper.processGetJsonFromNodes(ns, commitId, this)
.getActiveElementMap().values();
stream.write(result.stream().map(this::toJson).collect(Collectors.joining(sep))
.getBytes(StandardCharsets.UTF_8));
} catch (IOException ioe) {
logger.error("Error writing to stream", ioe);
}
});
if (!"application/x-ndjson".equals(accept)) {
stream.write("]}".getBytes(StandardCharsets.UTF_8));
} else {
stream.write("\n".getBytes(StandardCharsets.UTF_8));
}
}

@Override
public ElementsResponse read(String projectId, String refId, String id,
Map<String, String> params) {
Expand Down Expand Up @@ -244,4 +303,18 @@ private CommitJson createCommit(String creator, String refId, String projectId,
cmjs.setProjectId(projectId);
return cmjs;
}

protected static <T> Stream<List<T>> batches(List<T> source, int length) {
return IntStream.iterate(0, i -> i < source.size(), i -> i + length)
.mapToObj(i -> source.subList(i, Math.min(i + length, source.size())));
}

protected String toJson(ElementJson elementJson) {
try {
return objectMapper.writeValueAsString(elementJson);
} catch (JsonProcessingException e) {
logger.error("Error in toJson: ", e);
}
return "";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,17 @@
@Service
public class NodeGetHelper extends NodeOperation {

public NodeGetInfo processGetJsonFromNodes(List<Node> nodes, NodeService service) {
NodeGetInfo info = initInfoFromNodes(nodes, null);
return processLatest(info, service);
}

public NodeGetInfo processGetJson(List<ElementJson> elements, NodeService service) {
NodeGetInfo info = initInfo(elements, null);
return processLatest(info, service);
}

private NodeGetInfo processLatest(NodeGetInfo info, NodeService service) {
for (String nodeId : info.getReqElementMap().keySet()) {
if (!existingNodeContainsNodeId(info, nodeId)) {
continue;
Expand All @@ -39,19 +47,29 @@ public NodeGetInfo processGetJson(List<ElementJson> elements, NodeService servic
return info;
}

public NodeGetInfo processGetJsonFromNodes(List<Node> nodes, String commitId, NodeService service) {
if (commitId == null || commitId.isEmpty()) {
return processGetJsonFromNodes(nodes, service);
}
NodeGetInfo info = initInfoFromNodes(nodes, null);
return processCommit(info, commitId, service);
}

public NodeGetInfo processGetJson(List<ElementJson> elements, String commitId, NodeService service) {
if (commitId == null || commitId.isEmpty()) {
return processGetJson(elements, service);
}
NodeGetInfo info = initInfo(elements, null); //gets all current nodes
return processCommit(info, commitId, service);
}

private NodeGetInfo processCommit(NodeGetInfo info, String commitId, NodeService service) {
Optional<Commit> commit = commitRepository.findByCommitId(commitId);
if (!commit.isPresent() ) {
throw new BadRequestException("commitId is invalid");
}
Instant time = commit.get().getTimestamp(); //time of commit
List<String> refCommitIds = null; //get it later if needed

NodeGetInfo info = initInfo(elements, null); //gets all current nodes
for (String nodeId : info.getReqElementMap().keySet()) {
if (!existingNodeContainsNodeId(info, nodeId)) { // nodeId not found
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,14 @@ public void initCommitJson(CommitJson cmjs, Instant now) {
cmjs.setType("Commit");
}

public NodeChangeInfo initInfo(List<ElementJson> elements, CommitJson cmjs) {

public NodeChangeInfo initInfoFromNodes(List<Node> existingNodes, CommitJson cmjs) {
Set<String> indexIds = new HashSet<>();
Map<String, ElementJson> reqElementMap = convertJsonToMap(elements);
List<Node> existingNodes = nodeRepository.findAllByNodeIds(reqElementMap.keySet());
Map<String, Node> existingNodeMap = new HashMap<>();
Map<String, ElementJson> reqElementMap = new HashMap<>();
for (Node node : existingNodes) {
indexIds.add(node.getDocId());
existingNodeMap.put(node.getNodeId(), node);
reqElementMap.put(node.getNodeId(), new ElementJson().setId(node.getNodeId()));
}
// bulk read existing elements in elastic
List<ElementJson> existingElements = nodeIndex.findAllById(indexIds);
Expand All @@ -109,6 +108,14 @@ public NodeChangeInfo initInfo(List<ElementJson> elements, CommitJson cmjs) {
return info;
}

public NodeChangeInfo initInfo(List<ElementJson> elements, CommitJson cmjs) {
Map<String, ElementJson> reqElementMap = convertJsonToMap(elements);
List<Node> existingNodes = nodeRepository.findAllByNodeIds(reqElementMap.keySet());
NodeChangeInfo info = initInfoFromNodes(existingNodes, cmjs);
info.setReqElementMap(reqElementMap);
return info;
}

public void processElementAdded(ElementJson e, Node n, NodeChangeInfo info) {
CommitJson cmjs = info.getCommitJson();
processElementAddedOrUpdated(e, n, info);
Expand Down
Loading