Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,11 @@ CompletableFuture<PostResult> post(PostKnowledgeInteraction aPKI, RecipientSelec
* {@link SmartConnector} is unpredictable.
*
* Note that a stopped {@link SmartConnector} can no longer be used.
*
* @return a completable future that completes <i>after</i>
* {@link KnowledgeBase#smartConnectorStopped(SmartConnector)} is
* called.
*/
void stop();
CompletableFuture<Void> stop();

}
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,7 @@ public void smartConnectorStopped(SmartConnector aSC) {
LOG.info("canceled inactivity timer for {} because its smart connector stopped.",
this.getKnowledgeBaseId());
}
LOG.debug("SC of KB '{}' has stopped.", this.knowledgeBaseId);
}

private void cancelAsyncResponse() {
Expand All @@ -743,15 +744,21 @@ public void stop() {
if (this.hasAsyncResponse())
this.cancelAsyncResponse();
this.cancelInactivityTimeout();
this.sc.stop();
try {
this.sc.stop().get();
} catch (InterruptedException | ExecutionException e) {
LOG.error("An error occurred while stopping SC of KB '{}'.", this.knowledgeBaseId);
}
this.cancelAndClearAllHandleRequests();
}

private void cancelAndClearAllHandleRequests() {
List<Integer> cancelledRequests = this.toBeProcessedHandleRequests.stream()
.map(HandleRequest::getHandleRequestId).toList();
LOG.warn("KB with id " + this.knowledgeBaseId
+ " has stopped. The following handle requests will be cancelled: " + cancelledRequests);

if (cancelledRequests.size() > 0)
LOG.warn("KB with id " + this.knowledgeBaseId
+ " has stopped. The following handle requests will be cancelled: " + cancelledRequests);

String cancelMessage = "KB with id " + this.knowledgeBaseId + " will no longer respond, because it stopped.";
this.toBeProcessedHandleRequests.forEach(hr -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public boolean deleteKB(String knowledgeBaseId) {
rkb.stop();
success = true;
this.restKnowledgeBases.remove(knowledgeBaseId);
LOG.info("Removed KB {}", knowledgeBaseId);
LOG.debug("Removed KB {}", knowledgeBaseId);
} catch (RuntimeException e) {
LOG.error("Could not stop knowledge base {}, so it stays in the list of knowledge bases", knowledgeBaseId);
LOG.error("Encountered exception while stopping knowledge base", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import eu.knowledge.engine.rest.api.NotFoundException;
import eu.knowledge.engine.rest.model.ResponseMessage;
import eu.knowledge.engine.rest.model.SmartConnector;
import eu.knowledge.engine.smartconnector.api.KnowledgeEngineException;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Content;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import eu.knowledge.engine.smartconnector.api.AskPlan;
import eu.knowledge.engine.smartconnector.api.AskResult;
import eu.knowledge.engine.smartconnector.api.BindingSet;
import eu.knowledge.engine.smartconnector.api.GraphPattern;
import eu.knowledge.engine.smartconnector.api.KnowledgeBase;
import eu.knowledge.engine.smartconnector.api.KnowledgeInteraction;
import eu.knowledge.engine.smartconnector.api.PostKnowledgeInteraction;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.net.URI;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.apache.jena.rdf.model.Resource;

Expand Down Expand Up @@ -77,6 +78,6 @@ public interface KnowledgeBaseStore {
URI getMetaId(URI knowledgeBaseId, KnowledgeInteractionInfo.Type kiType, Resource purpose);

Resource getPurpose(URI knowledgeBaseId, URI knowledgeInteractionId);
void stop();

CompletableFuture<Void> stop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
Expand All @@ -20,6 +21,7 @@
import eu.knowledge.engine.smartconnector.api.KnowledgeBase;
import eu.knowledge.engine.smartconnector.api.KnowledgeInteraction;
import eu.knowledge.engine.smartconnector.api.PostKnowledgeInteraction;
import eu.knowledge.engine.smartconnector.api.PostResult;
import eu.knowledge.engine.smartconnector.api.ReactHandler;
import eu.knowledge.engine.smartconnector.api.ReactKnowledgeInteraction;
import eu.knowledge.engine.smartconnector.api.Vocab;
Expand Down Expand Up @@ -322,8 +324,16 @@ public Resource getPurpose(URI knowledgeBaseId, URI knowledgeInteractionId) {
}

@Override
public void stop() {
public CompletableFuture<Void> stop() {
this.LOG.trace("Stopping KnowledgeBaseStoreImpl");
this.listeners.forEach(l -> l.smartConnectorStopping());

Set<CompletableFuture<?>> futures = new HashSet<>();
this.listeners.forEach(l -> {

CompletableFuture<PostResult> future = l.smartConnectorStopping();
futures.add(future);

});
return CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]));
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
package eu.knowledge.engine.smartconnector.impl;

import java.util.concurrent.CompletableFuture;

import eu.knowledge.engine.smartconnector.api.PostResult;

public interface KnowledgeBaseStoreListener {

void knowledgeInteractionRegistered(KnowledgeInteractionInfo ki);

void knowledgeInteractionUnregistered(KnowledgeInteractionInfo ki);

/**
* The Knowledge Base has requested the smart connector to stop.
*
* @return a future that will complete once all other knowledge bases have been
* notified of our termination.
*/
void smartConnectorStopping();
CompletableFuture<PostResult> smartConnectorStopping();

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -92,29 +94,41 @@ public CompletableFuture<AnswerMessage> sendAskMessage(AskMessage askMessage) th
}
CompletableFuture<AnswerMessage> future = new CompletableFuture<>();

// wait maximally WAIT_TIMEOUT for a return message.
int waitInSeconds = this.getWaitTimeout();
if (waitInSeconds > 0) {
future = future.orTimeout(waitInSeconds, TimeUnit.SECONDS);
}

future.whenComplete((m, e) -> {
this.openAskMessages.remove(askMessage.getMessageId());
if (m == null)
if (e != null)
if (e instanceof TimeoutException)
LOG.error("KB '{}' did not respond within {}s to AskMessage '{}'.",
askMessage.getToKnowledgeBase(), this.getWaitTimeout(), askMessage.getMessageId(), e);
askMessage.getToKnowledgeBase(), this.getWaitTimeout(), askMessage.getMessageId());
else if (e instanceof CancellationException)
LOG.debug("Waiting for AnswerMessage to AskMessage '{}' was cancelled due to a stopping SC.",
askMessage.getMessageId());
else
LOG.error("A {} occurred while sending an AskMessage.", e.getClass().getSimpleName(), e);
else
LOG.error(
"The AnswerMessage future should complete either exceptionally or normally. Not with both AnswerMessage and Exception null.");

this.openAskMessages.remove(askMessage.getMessageId());
});

// wait maximally WAIT_TIMEOUT for a return message.
int waitInSeconds = this.getWaitTimeout();
if (waitInSeconds > 0) {
future = future.orTimeout(waitInSeconds, TimeUnit.SECONDS);
}

// first send and if success add to open messages
this.openAskMessages.put(askMessage.getMessageId(), future);
messageDispatcher.send(askMessage);

try {
messageDispatcher.send(askMessage);
} catch (IOException ioe) {
// cancel future and remove from open messages
this.openAskMessages.remove(askMessage.getMessageId()).cancel(true);
// and re throw
throw ioe;
}

LOG.debug("Sent AskMessage: {}", askMessage);

Expand All @@ -129,28 +143,40 @@ public CompletableFuture<ReactMessage> sendPostMessage(PostMessage postMessage)
}
CompletableFuture<ReactMessage> future = new CompletableFuture<>();

// wait maximally WAIT_TIMEOUT for a return message.
int waitInSeconds = this.getWaitTimeout();
if (waitInSeconds > 0) {
future = future.orTimeout(waitInSeconds, TimeUnit.SECONDS);
}

future.whenComplete((m, e) -> {
this.openPostMessages.remove(postMessage.getMessageId());
if (m == null)
if (e != null)
if (e instanceof TimeoutException)
LOG.error("KB '{}' did not respond within {}s to PostMessage '{}'.",
postMessage.getToKnowledgeBase(), this.getWaitTimeout(), postMessage.getMessageId(), e);
LOG.warn("KB '{}' did not respond within {}s to PostMessage '{}'.",
postMessage.getToKnowledgeBase(), this.getWaitTimeout(), postMessage.getMessageId());
else if (e instanceof CancellationException)
LOG.debug("Waiting for ReactMessage to PostMessage '{}' was cancelled due to a stopping SC.",
postMessage.getMessageId());
else
LOG.error("A {} occurred while sending an PostMessage.", e.getClass().getSimpleName(), e);
else
LOG.error(
"The ReactMessage future should complete either exceptionally or normally. Not with both ReactMessage and Exception null.");
this.openAskMessages.remove(postMessage.getMessageId());
});

// wait maximally WAIT_TIMEOUT for a return message.
int waitInSeconds = this.getWaitTimeout();
if (waitInSeconds > 0) {
future = future.orTimeout(waitInSeconds, TimeUnit.SECONDS);
}

// first send and if success add to open messages
this.openPostMessages.put(postMessage.getMessageId(), future);
messageDispatcher.send(postMessage);

try {
messageDispatcher.send(postMessage);
} catch (IOException ioe) {
// cancel future and remove from open messages
this.openPostMessages.remove(postMessage.getMessageId()).cancel(true);
// and re throw
throw ioe;
}
LOG.debug("Sent PostMessage: {}", postMessage);

return future;
Expand Down Expand Up @@ -303,4 +329,19 @@ public void unsetMessageDispatcher() {
this.smartConnector.communicationInterrupted();
}

public void stop() {
int i = 0;

for (CompletableFuture<AnswerMessage> future : new HashSet<>(this.openAskMessages.values())) {
if (future.cancel(true))
i++;
}

for (CompletableFuture<ReactMessage> future : new HashSet<>(this.openPostMessages.values())) {
if (future.cancel(true))
i++;
}
LOG.debug("MessageRouterImpl stopped. Cancelled {} message(s).", i);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.jena.query.Query;
import org.apache.jena.query.QueryExecution;
Expand Down Expand Up @@ -585,21 +584,20 @@ public void knowledgeInteractionUnregistered(KnowledgeInteractionInfo ki) {
}

@Override
public void smartConnectorStopping() {
try {

var kiInfo = this.knowledgeBaseStore.getKnowledgeInteractionByObject(this.metaPostRemovedKI);
Set<OtherKnowledgeBase> otherKnowledgeBases = this.otherKnowledgeBaseStore.getOtherKnowledgeBases();
// Block on the future, but wait no longer than the timeout.
long timeout = POST_REMOVED_TIMEOUT_MILLIS_PER_OTHERKB
+ otherKnowledgeBases.size() * POST_REMOVED_TIMEOUT_MILLIS_PER_OTHERKB;
LOG.debug("Waiting for max {}ms for other KBs to ack my termination message.", timeout);
this.interactionProcessor.planPostFromKnowledgeBase(kiInfo, new RecipientSelector())
.execute(this.fillMetaBindings(null)).get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.error("An error occured while informing other KBs about our "
+ "termination. Proceeding to stop the smart connector regardless.", e);
}
public CompletableFuture<PostResult> smartConnectorStopping() {
var kiInfo = this.knowledgeBaseStore.getKnowledgeInteractionByObject(this.metaPostRemovedKI);
Set<OtherKnowledgeBase> otherKnowledgeBases = this.otherKnowledgeBaseStore.getOtherKnowledgeBases();
// Block on the future, but wait no longer than the timeout.
long timeout = POST_REMOVED_TIMEOUT_MILLIS_PER_OTHERKB
+ otherKnowledgeBases.size() * POST_REMOVED_TIMEOUT_MILLIS_PER_OTHERKB;
LOG.debug("Waiting for max {}ms for other KBs to ack my termination message.", timeout);
return this.interactionProcessor.planPostFromKnowledgeBase(kiInfo, new RecipientSelector())
.execute(this.fillMetaBindings(null)).orTimeout(timeout, TimeUnit.MILLISECONDS)
.exceptionally((Throwable e) -> {
LOG.error("An error occured while informing other KBs about our "
+ "termination. Proceeding to stop the smart connector regardless.", e);
return (PostResult) null;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ private void continueReasoningForward(eu.knowledge.engine.reasoner.api.BindingSe
if (aBindingSetHandler != null && aBindingSetHandler.getBindingSet() != null) {
resultBS = aBindingSetHandler.getBindingSet();
}

this.finalBindingSetFuture.complete(resultBS);
} else {
continueReasoningForward(incomingBS, aBindingSetHandler);
Expand Down Expand Up @@ -468,10 +468,12 @@ public CompletableFuture<eu.knowledge.engine.reasoner.api.BindingSet> handle(
});

} catch (IOException e) {
LOG.warn("Error '{}' occurred while sending {}",
LOG.warn(
"Error '{}' should not occur while sending an AskMessage from KB '{}' to KB '{}'. The latter might have been stopped.",
e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName(),
askMessage.getClass().getSimpleName());
LOG.debug("", e);
ReasonerProcessor.this.myKnowledgeInteraction.getKnowledgeBaseId(),
this.kii.getKnowledgeBaseId());
LOG.trace("Detailed error information.", e);
bsFuture = new CompletableFuture<eu.knowledge.engine.reasoner.api.BindingSet>();
bsFuture.complete(new eu.knowledge.engine.reasoner.api.BindingSet());
}
Expand Down Expand Up @@ -558,10 +560,12 @@ public CompletableFuture<eu.knowledge.engine.reasoner.api.BindingSet> handle(
});

} catch (IOException e) {
LOG.warn("Error '{}' occurred while sending {}",
LOG.warn(
"Error '{}' should not occur while sending an PostMessage from KB '{}' to KB '{}'. The latter might have been stopped.",
e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName(),
postMessage.getClass().getSimpleName());
LOG.debug("", e);
ReasonerProcessor.this.myKnowledgeInteraction.getKnowledgeBaseId(),
this.kii.getKnowledgeBaseId());
LOG.trace("Detailed error information.", e);
bsFuture = new CompletableFuture<eu.knowledge.engine.reasoner.api.BindingSet>();
bsFuture.complete(new eu.knowledge.engine.reasoner.api.BindingSet());
}
Expand Down Expand Up @@ -624,7 +628,12 @@ public CompletableFuture<Void> handle(eu.knowledge.engine.reasoner.api.BindingSe
});

} catch (IOException e) {
LOG.error("No errors should occur while sending an PostMessage.", e);
LOG.warn(
"Error '{}' should not occur while sending an PostMessage from KB '{}' to KB '{}'. The latter might have been stopped.",
e.getMessage() != null ? e.getMessage() : e.getClass().getSimpleName(),
ReasonerProcessor.this.myKnowledgeInteraction.getKnowledgeBaseId(),
this.kii.getKnowledgeBaseId());
LOG.trace("Detailed error information.", e);
bsFuture = new CompletableFuture<Void>();
bsFuture.complete((Void) null);
}
Expand Down
Loading
Loading