diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/ReasonerProcessor.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/ReasonerProcessor.java index 6ec813737..5f29d239c 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/ReasonerProcessor.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/ReasonerProcessor.java @@ -292,9 +292,10 @@ private void continueReasoningForward(eu.knowledge.engine.reasoner.api.BindingSe LOG.debug("All post tasks finished."); if (isComplete) { eu.knowledge.engine.reasoner.api.BindingSet resultBS = new eu.knowledge.engine.reasoner.api.BindingSet(); - if (aBindingSetHandler != null) { + if (aBindingSetHandler != null && aBindingSetHandler.getBindingSet() != null) { resultBS = aBindingSetHandler.getBindingSet(); } + this.finalBindingSetFuture.complete(resultBS); } else { continueReasoningForward(incomingBS, aBindingSetHandler); @@ -365,7 +366,7 @@ public CompletableFuture handle(eu.knowledge.engine.reasoner.api.BindingSe future.handle((r, e) -> { if (e != null) { - LOG.error("An exception has occured while capturing binging set", e); + LOG.error("An exception has occured while capturing binding set", e); return null; } else { return r; @@ -491,7 +492,7 @@ private S createFailedR incomingMessage.getFromKnowledgeInteraction(), incomingMessage.getMessageId(), failedMessage); } else if (incomingMessage instanceof PostMessage) { - outgoingMessage = (S) new AnswerMessage(incomingMessage.getToKnowledgeBase(), + outgoingMessage = (S) new ReactMessage(incomingMessage.getToKnowledgeBase(), incomingMessage.getToKnowledgeInteraction(), incomingMessage.getFromKnowledgeBase(), incomingMessage.getFromKnowledgeInteraction(), incomingMessage.getMessageId(), failedMessage); } @@ -604,12 +605,20 @@ public CompletableFuture handle(eu.knowledge.engine.reasoner.api.BindingSe .sendPostMessage(postMessage); Instant aPreviousSend = Instant.now(); bsFuture = sendPostMessage.exceptionally((Throwable t) -> { - LOG.error("A problem occurred while handling a bindingset.", t); - return null; // TODO when some error happens, what do we return? + String failedMessage = MessageFormatter + .basicArrayFormat("Error '{}' occurred while waiting for response to message: {}", + new String[] { + t.getMessage() != null ? t.getMessage() : t.getClass().getSimpleName(), + postMessage.getMessageId().toString() }); + LOG.warn(failedMessage); + LOG.debug("", t); + return ReasonerProcessor.this + .createFailedResponseMessageFromRequestMessage(postMessage, + failedMessage); }).thenApply((reactMessage) -> { - - ReasonerProcessor.this.postExchangeInfos - .add(convertMessageToExchangeInfo(newBS, new BindingSet(), reactMessage, aPreviousSend)); + assert reactMessage != null; + ReasonerProcessor.this.postExchangeInfos.add( + convertMessageToExchangeInfo(newBS, reactMessage.getResult(), reactMessage, aPreviousSend)); return (Void) null; }); diff --git a/smart-connector/src/test/java/eu/knowledge/engine/smartconnector/api/TestPostReact4.java b/smart-connector/src/test/java/eu/knowledge/engine/smartconnector/api/TestPostReact4.java new file mode 100644 index 000000000..cf26c62ab --- /dev/null +++ b/smart-connector/src/test/java/eu/knowledge/engine/smartconnector/api/TestPostReact4.java @@ -0,0 +1,115 @@ +package eu.knowledge.engine.smartconnector.api; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.Iterator; +import java.util.concurrent.ExecutionException; + +import org.apache.jena.shared.PrefixMapping; +import org.apache.jena.sparql.graph.PrefixMappingMem; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.knowledge.engine.smartconnector.util.KnowledgeNetwork; +import eu.knowledge.engine.smartconnector.util.KnowledgeBaseImpl; + +public class TestPostReact4 { + private static KnowledgeBaseImpl kb1; + private static KnowledgeBaseImpl kb2; + + public boolean kb2Received = false; + + private static final Logger LOG = LoggerFactory.getLogger(TestPostReact4.class); + + @Test + public void testPostReactTimeout() throws InterruptedException { + System.setProperty(SmartConnectorConfig.CONF_KEY_KE_KB_WAIT_TIMEOUT, "1"); + PrefixMappingMem prefixes = new PrefixMappingMem(); + prefixes.setNsPrefixes(PrefixMapping.Standard); + prefixes.setNsPrefix("ex", "https://www.tno.nl/example/"); + + KnowledgeNetwork kn = new KnowledgeNetwork(); + kb1 = new KnowledgeBaseImpl("kb1"); + kn.addKB(kb1); + kb2 = new KnowledgeBaseImpl("kb2"); + kn.addKB(kb2); + + GraphPattern gp1 = new GraphPattern(prefixes, "?a ?c."); + PostKnowledgeInteraction pKI = new PostKnowledgeInteraction(new CommunicativeAct(), gp1, null); + kb1.register(pKI); + + GraphPattern gp2 = new GraphPattern(prefixes, "?d ?e."); + ReactKnowledgeInteraction rKI = new ReactKnowledgeInteraction(new CommunicativeAct(), gp2, null); + kb2.register(rKI, ((anRKI, aReactExchangeInfo) -> { + LOG.trace("KB2 reacting..."); + TestPostReact4.this.kb2Received = true; + var argument = aReactExchangeInfo.getArgumentBindings(); + Iterator iter = argument.iterator(); + assertTrue(iter.hasNext(), "There should be at least a single binding."); + Binding b = iter.next(); + + assertEquals("", b.get("d"), "Binding of 'd' is incorrect."); + assertEquals("", b.get("e"), "Binding of 'e' is incorrect."); + + assertFalse(iter.hasNext(), "This BindingSet should only have a single binding."); + + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + LOG.error("{}", e); + } + + return new BindingSet(); + })); + + kn.sync(); + LOG.info("Everyone is up-to-date!"); + + BindingSet bindingSet = new BindingSet(); + Binding binding = new Binding(); + binding.put("a", ""); + binding.put("c", ""); + bindingSet.add(binding); + + try { + PostResult result = kb1.post(pKI, bindingSet).get(); + + assertTrue(this.kb2Received, "KB2 should have received the posted data."); + + assertEquals(result.getExchangeInfoPerKnowledgeBase().iterator().next().getStatus(), + ExchangeInfo.Status.FAILED); + + BindingSet bs = result.getBindings(); + assertTrue(bs.isEmpty()); + + LOG.info("After post!"); + } catch (ExecutionException e) { + LOG.error("Error", e); + fail(); + } + } + + @AfterAll + public static void cleanup() { + System.clearProperty(SmartConnectorConfig.CONF_KEY_KE_KB_WAIT_TIMEOUT); + LOG.info("Clean up: {}", TestPostReact4.class.getSimpleName()); + if (kb1 != null) { + kb1.stop(); + } else { + fail("KB1 should not be null!"); + } + + if (kb2 != null) { + + kb2.stop(); + } else { + fail("KB2 should not be null!"); + } + } + +} diff --git a/smart-connector/src/test/java/eu/knowledge/engine/smartconnector/api/TestPostReact5.java b/smart-connector/src/test/java/eu/knowledge/engine/smartconnector/api/TestPostReact5.java new file mode 100644 index 000000000..80acffffb --- /dev/null +++ b/smart-connector/src/test/java/eu/knowledge/engine/smartconnector/api/TestPostReact5.java @@ -0,0 +1,123 @@ +package eu.knowledge.engine.smartconnector.api; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import java.util.Iterator; +import java.util.concurrent.ExecutionException; + +import org.apache.jena.shared.PrefixMapping; +import org.apache.jena.sparql.graph.PrefixMappingMem; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import eu.knowledge.engine.smartconnector.util.KnowledgeNetwork; +import eu.knowledge.engine.smartconnector.util.KnowledgeBaseImpl; + +public class TestPostReact5 { + private static KnowledgeBaseImpl kb1; + private static KnowledgeBaseImpl kb2; + + public boolean kb2Received = false; + + private static final Logger LOG = LoggerFactory.getLogger(TestPostReact5.class); + + @Test + public void testPostReactTimeout() throws InterruptedException { + System.setProperty(SmartConnectorConfig.CONF_KEY_KE_KB_WAIT_TIMEOUT, "1"); + PrefixMappingMem prefixes = new PrefixMappingMem(); + prefixes.setNsPrefixes(PrefixMapping.Standard); + prefixes.setNsPrefix("ex", "https://www.tno.nl/example/"); + + KnowledgeNetwork kn = new KnowledgeNetwork(); + kb1 = new KnowledgeBaseImpl("kb1"); + kn.addKB(kb1); + kb2 = new KnowledgeBaseImpl("kb2"); + kn.addKB(kb2); + + GraphPattern gp1 = new GraphPattern(prefixes, "?a ?c."); + GraphPattern gp2 = new GraphPattern(prefixes, "?a ?c."); + PostKnowledgeInteraction pKI = new PostKnowledgeInteraction(new CommunicativeAct(), gp1, gp2); + kb1.register(pKI); + + GraphPattern gp3 = new GraphPattern(prefixes, "?d ?e."); + GraphPattern gp4 = new GraphPattern(prefixes, "?d ?e."); + ReactKnowledgeInteraction rKI = new ReactKnowledgeInteraction(new CommunicativeAct(), gp3, gp4); + kb2.register(rKI, ((anRKI, aReactExchangeInfo) -> { + LOG.trace("KB2 reacting..."); + TestPostReact5.this.kb2Received = true; + var argument = aReactExchangeInfo.getArgumentBindings(); + Iterator iter = argument.iterator(); + assertTrue(iter.hasNext(), "There should be at least a single binding."); + Binding b = iter.next(); + + assertEquals("", b.get("d"), "Binding of 'd' is incorrect."); + assertEquals("", b.get("e"), "Binding of 'e' is incorrect."); + + assertFalse(iter.hasNext(), "This BindingSet should only have a single binding."); + + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + LOG.error("{}", e); + } + + BindingSet bindingSet = new BindingSet(); + Binding bind = new Binding(); + bind.put("d", b.get("d")); + bind.put("e", b.get("e")); + bindingSet.add(bind); + + return bindingSet; + })); + + kn.sync(); + LOG.info("Everyone is up-to-date!"); + + BindingSet bindingSet = new BindingSet(); + Binding binding = new Binding(); + binding.put("a", ""); + binding.put("c", ""); + bindingSet.add(binding); + + try { + PostResult result = kb1.post(pKI, bindingSet).get(); + + assertTrue(this.kb2Received, "KB2 should have received the posted data."); + + assertEquals(result.getExchangeInfoPerKnowledgeBase().iterator().next().getStatus(), + ExchangeInfo.Status.FAILED); + + BindingSet bs = result.getBindings(); + assertTrue(bs.isEmpty()); + + LOG.info("After post!"); + } catch (ExecutionException e) { + LOG.error("Error", e); + fail(); + } + } + + @AfterAll + public static void cleanup() { + System.clearProperty(SmartConnectorConfig.CONF_KEY_KE_KB_WAIT_TIMEOUT); + LOG.info("Clean up: {}", TestPostReact5.class.getSimpleName()); + if (kb1 != null) { + kb1.stop(); + } else { + fail("KB1 should not be null!"); + } + + if (kb2 != null) { + + kb2.stop(); + } else { + fail("KB2 should not be null!"); + } + } + +}