From bdb57c9915c1f387a10ae004b4388b13a25e8b3e Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Tue, 23 Jan 2024 21:00:49 +0100 Subject: [PATCH 01/19] Change log level and remove DependencyReducedPom --- admin-ui/pom.xml | 1 + knowledge-directory/pom.xml | 1 + smart-connector-rest-dist/pom.xml | 1 + .../smartconnector/runtime/messaging/RemoteKerConnection.java | 4 ++-- 4 files changed, 5 insertions(+), 2 deletions(-) diff --git a/admin-ui/pom.xml b/admin-ui/pom.xml index 125b055f2..edbbdc894 100644 --- a/admin-ui/pom.xml +++ b/admin-ui/pom.xml @@ -200,6 +200,7 @@ true with-dependencies + false diff --git a/knowledge-directory/pom.xml b/knowledge-directory/pom.xml index 6311e90f2..afcfb545f 100644 --- a/knowledge-directory/pom.xml +++ b/knowledge-directory/pom.xml @@ -229,6 +229,7 @@ true with-dependencies + false diff --git a/smart-connector-rest-dist/pom.xml b/smart-connector-rest-dist/pom.xml index f7b15743c..2790e08d6 100644 --- a/smart-connector-rest-dist/pom.xml +++ b/smart-connector-rest-dist/pom.xml @@ -64,6 +64,7 @@ true with-dependencies + false diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java index 734e18358..5f633b2b8 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java @@ -240,7 +240,7 @@ public void stop() { LOG.debug("", e); } } else - LOG.info("Still ignoring KER {}.", this.remoteKerUri); + LOG.warn("Still ignoring KER {}.", this.remoteKerUri); // if someone calls this stop method, all smart connectors should be removed // from the other knowledge base store. We do this by removing the ker details @@ -316,7 +316,7 @@ public void sendMyKerDetailsToPeer(KnowledgeEngineRuntimeDetails details) { LOG.debug("", e); } } else - LOG.info("Still ignoring KER {}.", this.remoteKerUri); + LOG.warn("Still ignoring KER {}.", this.remoteKerUri); } private String getPathForMessageType(KnowledgeMessage message) { From 3d7c612db9ec80015d0d1973776c25eab397d0e6 Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Sat, 3 Feb 2024 21:45:49 +0100 Subject: [PATCH 02/19] Robustness improvements. Done: - remove unnecessary exceptions from log - make logging smarter/more comprihensible - introduce different timeouts --- examples/unreachable-runtimes/readme.md | 6 +++ .../impl/MessageRouterImpl.java | 15 ++++++ .../impl/MetaKnowledgeBaseImpl.java | 8 --- .../impl/OtherKnowledgeBaseStoreImpl.java | 2 +- .../messaging/RemoteKerConnection.java | 51 ++++++++++++------- 5 files changed, 55 insertions(+), 27 deletions(-) diff --git a/examples/unreachable-runtimes/readme.md b/examples/unreachable-runtimes/readme.md index e5d1fc0f2..118321c23 100644 --- a/examples/unreachable-runtimes/readme.md +++ b/examples/unreachable-runtimes/readme.md @@ -44,4 +44,10 @@ iptables-legacy -P INPUT DROP iptables-legacy -P INPUT ACCEPT #runtime-3 is now reachable again for other KERs and can also reach the KD and other KERs. +``` + +Another scenario that you can check is when other KERs can access runtime-3, but it cannot send back a response to runtime-1. To do this, use the following filewall rule: + +``` +iptables-legacy -A OUTPUT -p tcp -d runtime-1 -m state --state NEW -j DROP ``` \ No newline at end of file diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java index a63ec497f..84af1a619 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java @@ -8,6 +8,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -29,6 +30,8 @@ public class MessageRouterImpl implements MessageRouter, SmartConnectorEndpoint */ private static final int MAX_ENTRIES = 5000; + private static final int WAIT_TIMEOUT = 30; + private final SmartConnectorImpl smartConnector; private final Map> openAskMessages = Collections .synchronizedMap(new LinkedHashMap>() { @@ -84,6 +87,12 @@ public CompletableFuture sendAskMessage(AskMessage askMessage) th throw new IOException("Not connected to MessageDispatcher"); } CompletableFuture future = new CompletableFuture<>(); + + // wait maximally WAIT_TIMEOUT for a return message. + future.orTimeout(WAIT_TIMEOUT, TimeUnit.SECONDS).whenComplete((m, e) -> { + this.openAskMessages.remove(askMessage.getMessageId()); + }); + this.openAskMessages.put(askMessage.getMessageId(), future); messageDispatcher.send(askMessage); @@ -99,6 +108,12 @@ public CompletableFuture sendPostMessage(PostMessage postMessage) throw new IOException("Not connected to MessageDispatcher"); } CompletableFuture future = new CompletableFuture<>(); + + // wait maximally WAIT_TIMEOUT for a return message. + future.orTimeout(WAIT_TIMEOUT, TimeUnit.SECONDS).whenComplete((m, e) -> { + this.openAskMessages.remove(postMessage.getMessageId()); + }); + this.openPostMessages.put(postMessage.getMessageId(), future); messageDispatcher.send(postMessage); LOG.debug("Sent PostMessage: {}", postMessage); diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MetaKnowledgeBaseImpl.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MetaKnowledgeBaseImpl.java index 78cd13fc3..6f54b5176 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MetaKnowledgeBaseImpl.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MetaKnowledgeBaseImpl.java @@ -340,14 +340,6 @@ public CompletableFuture getOtherKnowledgeBase(URI toKnowled // condition that otherKnowledgeBase should NEVER be null. return null; } - }).handle((r, e) -> { - - if (r == null) { - LOG.error("An exception has occured while getting Other Knowledge Base", e); - return null; - } else { - return r; - } }); return future; } catch (IOException e) { diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/OtherKnowledgeBaseStoreImpl.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/OtherKnowledgeBaseStoreImpl.java index 780d2f9b5..08bff0b1f 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/OtherKnowledgeBaseStoreImpl.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/OtherKnowledgeBaseStoreImpl.java @@ -73,7 +73,7 @@ public CompletableFuture populate() { }).handle((r, e) -> { if (r == null && e != null) { - LOG.error("An exception has occured while adding an other Knowledge Base ", e); + LOG.debug("An exception has occured while adding {} an other Knowledge Base ", id, e); return null; } else { return r; diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java index 5f633b2b8..0f548838a 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java @@ -46,7 +46,7 @@ public class RemoteKerConnection { /** * A maximum amount of time to wait for othe HTTP REST call to fail/succeed. */ - private static final int HTTP_TIMEOUT = 30; + private static final int HTTP_TIMEOUT = 10; public static final Logger LOG = LoggerFactory.getLogger(RemoteKerConnection.class); @@ -60,6 +60,7 @@ public class RemoteKerConnection { private LocalDateTime tryAgainAfter = null; private int errorCounter = 0; + private LocalDateTime logStillIgnoringAfter = null; public RemoteKerConnection(MessageDispatcher dispatcher, KnowledgeEngineRuntimeConnectionDetails kerConnectionDetails) { @@ -103,6 +104,7 @@ public URI getRemoteKerUri() { private void noError() { this.errorCounter = 0; this.tryAgainAfter = null; + this.logStillIgnoringAfter = null; } private int errorOccurred() { @@ -119,7 +121,8 @@ private int errorOccurred() { private void updateRemoteKerDataFromPeer() { try { HttpRequest request = HttpRequest.newBuilder(new URI(this.remoteKerUri + "/runtimedetails")) - .header("Content-Type", "application/json").version(Version.HTTP_1_1).GET().build(); + .header("Content-Type", "application/json").timeout(Duration.ofSeconds(HTTP_TIMEOUT)) + .version(Version.HTTP_1_1).GET().build(); HttpResponse response = this.httpClient.send(request, BodyHandlers.ofString()); if (response.statusCode() == 200) { @@ -153,6 +156,7 @@ private boolean isAvailable() { boolean after = LocalDateTime.now().isAfter(tryAgainAfter); if (after) { LOG.info("KER {} available again.", this.remoteKerUri); + this.tryAgainAfter = null; } return after; } else @@ -226,7 +230,8 @@ public void stop() { HttpRequest request = HttpRequest .newBuilder(new URI(this.remoteKerUri + "/runtimedetails/" + dispatcher.getKnowledgeDirectoryConnectionManager().getMyKnowledgeDirectoryId())) - .header("Content-Type", "application/json").version(Version.HTTP_1_1).DELETE().build(); + .header("Content-Type", "application/json").timeout(Duration.ofSeconds(HTTP_TIMEOUT)) + .version(Version.HTTP_1_1).DELETE().build(); HttpResponse response = this.httpClient.send(request, BodyHandlers.ofString()); if (response.statusCode() == 200) { @@ -240,7 +245,7 @@ public void stop() { LOG.debug("", e); } } else - LOG.warn("Still ignoring KER {}.", this.remoteKerUri); + logStillIgnoring(); // if someone calls this stop method, all smart connectors should be removed // from the other knowledge base store. We do this by removing the ker details @@ -249,6 +254,16 @@ public void stop() { dispatcher.notifySmartConnectorsChanged(); } + /** + * To prevent many "Still ignoring" messages, we only log them once a minute. + */ + private void logStillIgnoring() { + if (logStillIgnoringAfter == null || logStillIgnoringAfter.isBefore(LocalDateTime.now())) { + LOG.warn("Still ignoring KER {}.", this.remoteKerUri); + logStillIgnoringAfter = LocalDateTime.now().plusMinutes(1); + } + } + public void sendToRemoteSmartConnector(KnowledgeMessage message) throws IOException { assert (getRemoteKerDetails() == null ? true : getRemoteKerDetails().getSmartConnectorIds().contains(message.getToKnowledgeBase().toString())); @@ -259,8 +274,8 @@ public void sendToRemoteSmartConnector(KnowledgeMessage message) throws IOExcept String jsonMessage = objectMapper.writeValueAsString(MessageConverter.toJson(message)); HttpRequest request = HttpRequest .newBuilder(new URI(this.remoteKerUri + getPathForMessageType(message))) - .header("Content-Type", "application/json").version(Version.HTTP_1_1) - .POST(BodyPublishers.ofString(jsonMessage)).build(); + .header("Content-Type", "application/json").timeout(Duration.ofSeconds(HTTP_TIMEOUT)) + .version(Version.HTTP_1_1).POST(BodyPublishers.ofString(jsonMessage)).build(); HttpResponse response = this.httpClient.send(request, BodyHandlers.ofString()); @@ -285,7 +300,7 @@ public void sendToRemoteSmartConnector(KnowledgeMessage message) throws IOExcept throw e; } } else { - LOG.warn("Still ignoring KER {}.", this.remoteKerUri); + logStillIgnoring(); throw new IOException("KER " + this.remoteKerUri + " is currently unavailable. Trying again later."); } } @@ -295,28 +310,28 @@ public void sendMyKerDetailsToPeer(KnowledgeEngineRuntimeDetails details) { try { String jsonMessage = objectMapper.writeValueAsString(details); HttpRequest request = HttpRequest.newBuilder(new URI(this.remoteKerUri + "/runtimedetails")) - .header("Content-Type", "application/json").version(Version.HTTP_1_1) - .POST(BodyPublishers.ofString(jsonMessage)).build(); + .header("Content-Type", "application/json").timeout(Duration.ofSeconds(HTTP_TIMEOUT)) + .version(Version.HTTP_1_1).POST(BodyPublishers.ofString(jsonMessage)).build(); HttpResponse response = this.httpClient.send(request, BodyHandlers.ofString()); if (response.statusCode() == 200) { this.noError(); LOG.trace("Successfully sent updated KnowledgeEngineRuntimeDetails to {}", this.remoteKerUri); } else { - this.remoteKerDetails = null; - this.errorOccurred(); - LOG.warn("Failed to send updated KnowledgeEngineRuntimeDetails to {}, got response {}: {}", - this.remoteKerUri, response.statusCode(), response.body()); + int time = this.errorOccurred(); + LOG.warn( + "Ignoring KER {} for {} minutes. Failed to send updated KnowledgeEngineRuntimeDetails, got response {}: {}", + this.remoteKerUri, time, response.statusCode(), response.body()); } } catch (IOException | URISyntaxException | InterruptedException e) { - this.remoteKerDetails = null; - this.errorOccurred(); - LOG.warn("Failed to send updated KnowledgeEngineRuntimeDetails to " - + remoteKerConnectionDetails.getId()); + int time = this.errorOccurred(); + LOG.warn( + "Ignoring KER {} for {} minutes. Failed to send updated KnowledgeEngineRuntimeDetails due to '{}'", + this.remoteKerUri, time, e.getMessage()); LOG.debug("", e); } } else - LOG.warn("Still ignoring KER {}.", this.remoteKerUri); + logStillIgnoring(); } private String getPathForMessageType(KnowledgeMessage message) { From 15646ad6cbb81a765be13697383ff9cce67e115f Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Tue, 13 Feb 2024 11:15:48 +0100 Subject: [PATCH 03/19] Remove all smart connectors when KER is unavailable. --- .../runtime/messaging/RemoteKerConnection.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java index 0f548838a..e11977f88 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java @@ -237,10 +237,12 @@ public void stop() { if (response.statusCode() == 200) { LOG.trace("Successfully said goodbye to {}", this.remoteKerUri); } else { + this.remoteKerDetails = null; LOG.warn("Failed to say goodbye to {}, got response {}: {}", this.remoteKerUri, response.statusCode(), response.body()); } } catch (IOException | URISyntaxException | InterruptedException e) { + this.remoteKerDetails = null; LOG.warn("Failed to say goodbye to " + remoteKerConnectionDetails.getId()); LOG.debug("", e); } @@ -283,6 +285,7 @@ public void sendToRemoteSmartConnector(KnowledgeMessage message) throws IOExcept this.noError(); LOG.trace("Successfully sent message {} to {}", message.getMessageId(), this.remoteKerUri); } else { + this.remoteKerDetails = null; int time = this.errorOccurred(); LOG.warn("Ignoring KER {} for {} minutes. Failed to send message {} to {}, got response {}: {}", this.remoteKerUri, time, message.getMessageId(), this.remoteKerUri, response.statusCode(), @@ -291,10 +294,12 @@ public void sendToRemoteSmartConnector(KnowledgeMessage message) throws IOExcept + ", body " + response.body()); } } catch (JsonProcessingException | URISyntaxException | InterruptedException e) { + this.remoteKerDetails = null; int time = this.errorOccurred(); LOG.warn("Ignoring KER {} for {} minutes.", this.remoteKerUri, time); throw new IOException("Could not send message to remote SmartConnector.", e); } catch (IOException e) { + this.remoteKerDetails = null; int time = this.errorOccurred(); LOG.warn("Ignoring KER {} for {} minutes.", this.remoteKerUri, time); throw e; @@ -318,12 +323,14 @@ public void sendMyKerDetailsToPeer(KnowledgeEngineRuntimeDetails details) { this.noError(); LOG.trace("Successfully sent updated KnowledgeEngineRuntimeDetails to {}", this.remoteKerUri); } else { + this.remoteKerDetails = null; int time = this.errorOccurred(); LOG.warn( "Ignoring KER {} for {} minutes. Failed to send updated KnowledgeEngineRuntimeDetails, got response {}: {}", this.remoteKerUri, time, response.statusCode(), response.body()); } } catch (IOException | URISyntaxException | InterruptedException e) { + this.remoteKerDetails = null; int time = this.errorOccurred(); LOG.warn( "Ignoring KER {} for {} minutes. Failed to send updated KnowledgeEngineRuntimeDetails due to '{}'", From c9faf1f1762b3e19f8531e6489c7f5937ea72c7e Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Tue, 13 Feb 2024 17:36:15 +0100 Subject: [PATCH 04/19] notify everyone when KER becomes unavailable. --- .../runtime/messaging/RemoteKerConnection.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java index e11977f88..024be4321 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java @@ -293,16 +293,12 @@ public void sendToRemoteSmartConnector(KnowledgeMessage message) throws IOExcept throw new IOException("Message not accepted by remote host, status code " + response.statusCode() + ", body " + response.body()); } - } catch (JsonProcessingException | URISyntaxException | InterruptedException e) { + } catch (URISyntaxException | InterruptedException | IOException e) { this.remoteKerDetails = null; int time = this.errorOccurred(); LOG.warn("Ignoring KER {} for {} minutes.", this.remoteKerUri, time); - throw new IOException("Could not send message to remote SmartConnector.", e); - } catch (IOException e) { - this.remoteKerDetails = null; - int time = this.errorOccurred(); - LOG.warn("Ignoring KER {} for {} minutes.", this.remoteKerUri, time); - throw e; + this.dispatcher.notifySmartConnectorsChanged(); + throw new IOException(e); } } else { logStillIgnoring(); From 41888131f781176415dbf53002f6da34b7995c85 Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Tue, 23 Jan 2024 21:00:49 +0100 Subject: [PATCH 05/19] Change log level and remove DependencyReducedPom --- admin-ui/pom.xml | 1 + knowledge-directory/pom.xml | 1 + smart-connector-rest-dist/pom.xml | 1 + .../smartconnector/runtime/messaging/RemoteKerConnection.java | 4 ++-- 4 files changed, 5 insertions(+), 2 deletions(-) diff --git a/admin-ui/pom.xml b/admin-ui/pom.xml index 125b055f2..edbbdc894 100644 --- a/admin-ui/pom.xml +++ b/admin-ui/pom.xml @@ -200,6 +200,7 @@ true with-dependencies + false diff --git a/knowledge-directory/pom.xml b/knowledge-directory/pom.xml index 6311e90f2..afcfb545f 100644 --- a/knowledge-directory/pom.xml +++ b/knowledge-directory/pom.xml @@ -229,6 +229,7 @@ true with-dependencies + false diff --git a/smart-connector-rest-dist/pom.xml b/smart-connector-rest-dist/pom.xml index f7b15743c..2790e08d6 100644 --- a/smart-connector-rest-dist/pom.xml +++ b/smart-connector-rest-dist/pom.xml @@ -64,6 +64,7 @@ true with-dependencies + false diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java index 734e18358..5f633b2b8 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java @@ -240,7 +240,7 @@ public void stop() { LOG.debug("", e); } } else - LOG.info("Still ignoring KER {}.", this.remoteKerUri); + LOG.warn("Still ignoring KER {}.", this.remoteKerUri); // if someone calls this stop method, all smart connectors should be removed // from the other knowledge base store. We do this by removing the ker details @@ -316,7 +316,7 @@ public void sendMyKerDetailsToPeer(KnowledgeEngineRuntimeDetails details) { LOG.debug("", e); } } else - LOG.info("Still ignoring KER {}.", this.remoteKerUri); + LOG.warn("Still ignoring KER {}.", this.remoteKerUri); } private String getPathForMessageType(KnowledgeMessage message) { From d858dbdef298b591c850de655ceebf2e82b9b1e8 Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Sat, 3 Feb 2024 21:45:49 +0100 Subject: [PATCH 06/19] Robustness improvements. Done: - remove unnecessary exceptions from log - make logging smarter/more comprihensible - introduce different timeouts --- examples/unreachable-runtimes/readme.md | 6 +++ .../impl/MessageRouterImpl.java | 15 ++++++ .../impl/MetaKnowledgeBaseImpl.java | 8 --- .../impl/OtherKnowledgeBaseStoreImpl.java | 2 +- .../messaging/RemoteKerConnection.java | 51 ++++++++++++------- 5 files changed, 55 insertions(+), 27 deletions(-) diff --git a/examples/unreachable-runtimes/readme.md b/examples/unreachable-runtimes/readme.md index e5d1fc0f2..118321c23 100644 --- a/examples/unreachable-runtimes/readme.md +++ b/examples/unreachable-runtimes/readme.md @@ -44,4 +44,10 @@ iptables-legacy -P INPUT DROP iptables-legacy -P INPUT ACCEPT #runtime-3 is now reachable again for other KERs and can also reach the KD and other KERs. +``` + +Another scenario that you can check is when other KERs can access runtime-3, but it cannot send back a response to runtime-1. To do this, use the following filewall rule: + +``` +iptables-legacy -A OUTPUT -p tcp -d runtime-1 -m state --state NEW -j DROP ``` \ No newline at end of file diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java index a63ec497f..84af1a619 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java @@ -8,6 +8,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -29,6 +30,8 @@ public class MessageRouterImpl implements MessageRouter, SmartConnectorEndpoint */ private static final int MAX_ENTRIES = 5000; + private static final int WAIT_TIMEOUT = 30; + private final SmartConnectorImpl smartConnector; private final Map> openAskMessages = Collections .synchronizedMap(new LinkedHashMap>() { @@ -84,6 +87,12 @@ public CompletableFuture sendAskMessage(AskMessage askMessage) th throw new IOException("Not connected to MessageDispatcher"); } CompletableFuture future = new CompletableFuture<>(); + + // wait maximally WAIT_TIMEOUT for a return message. + future.orTimeout(WAIT_TIMEOUT, TimeUnit.SECONDS).whenComplete((m, e) -> { + this.openAskMessages.remove(askMessage.getMessageId()); + }); + this.openAskMessages.put(askMessage.getMessageId(), future); messageDispatcher.send(askMessage); @@ -99,6 +108,12 @@ public CompletableFuture sendPostMessage(PostMessage postMessage) throw new IOException("Not connected to MessageDispatcher"); } CompletableFuture future = new CompletableFuture<>(); + + // wait maximally WAIT_TIMEOUT for a return message. + future.orTimeout(WAIT_TIMEOUT, TimeUnit.SECONDS).whenComplete((m, e) -> { + this.openAskMessages.remove(postMessage.getMessageId()); + }); + this.openPostMessages.put(postMessage.getMessageId(), future); messageDispatcher.send(postMessage); LOG.debug("Sent PostMessage: {}", postMessage); diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MetaKnowledgeBaseImpl.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MetaKnowledgeBaseImpl.java index 78cd13fc3..6f54b5176 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MetaKnowledgeBaseImpl.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MetaKnowledgeBaseImpl.java @@ -340,14 +340,6 @@ public CompletableFuture getOtherKnowledgeBase(URI toKnowled // condition that otherKnowledgeBase should NEVER be null. return null; } - }).handle((r, e) -> { - - if (r == null) { - LOG.error("An exception has occured while getting Other Knowledge Base", e); - return null; - } else { - return r; - } }); return future; } catch (IOException e) { diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/OtherKnowledgeBaseStoreImpl.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/OtherKnowledgeBaseStoreImpl.java index 780d2f9b5..08bff0b1f 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/OtherKnowledgeBaseStoreImpl.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/OtherKnowledgeBaseStoreImpl.java @@ -73,7 +73,7 @@ public CompletableFuture populate() { }).handle((r, e) -> { if (r == null && e != null) { - LOG.error("An exception has occured while adding an other Knowledge Base ", e); + LOG.debug("An exception has occured while adding {} an other Knowledge Base ", id, e); return null; } else { return r; diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java index 5f633b2b8..0f548838a 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java @@ -46,7 +46,7 @@ public class RemoteKerConnection { /** * A maximum amount of time to wait for othe HTTP REST call to fail/succeed. */ - private static final int HTTP_TIMEOUT = 30; + private static final int HTTP_TIMEOUT = 10; public static final Logger LOG = LoggerFactory.getLogger(RemoteKerConnection.class); @@ -60,6 +60,7 @@ public class RemoteKerConnection { private LocalDateTime tryAgainAfter = null; private int errorCounter = 0; + private LocalDateTime logStillIgnoringAfter = null; public RemoteKerConnection(MessageDispatcher dispatcher, KnowledgeEngineRuntimeConnectionDetails kerConnectionDetails) { @@ -103,6 +104,7 @@ public URI getRemoteKerUri() { private void noError() { this.errorCounter = 0; this.tryAgainAfter = null; + this.logStillIgnoringAfter = null; } private int errorOccurred() { @@ -119,7 +121,8 @@ private int errorOccurred() { private void updateRemoteKerDataFromPeer() { try { HttpRequest request = HttpRequest.newBuilder(new URI(this.remoteKerUri + "/runtimedetails")) - .header("Content-Type", "application/json").version(Version.HTTP_1_1).GET().build(); + .header("Content-Type", "application/json").timeout(Duration.ofSeconds(HTTP_TIMEOUT)) + .version(Version.HTTP_1_1).GET().build(); HttpResponse response = this.httpClient.send(request, BodyHandlers.ofString()); if (response.statusCode() == 200) { @@ -153,6 +156,7 @@ private boolean isAvailable() { boolean after = LocalDateTime.now().isAfter(tryAgainAfter); if (after) { LOG.info("KER {} available again.", this.remoteKerUri); + this.tryAgainAfter = null; } return after; } else @@ -226,7 +230,8 @@ public void stop() { HttpRequest request = HttpRequest .newBuilder(new URI(this.remoteKerUri + "/runtimedetails/" + dispatcher.getKnowledgeDirectoryConnectionManager().getMyKnowledgeDirectoryId())) - .header("Content-Type", "application/json").version(Version.HTTP_1_1).DELETE().build(); + .header("Content-Type", "application/json").timeout(Duration.ofSeconds(HTTP_TIMEOUT)) + .version(Version.HTTP_1_1).DELETE().build(); HttpResponse response = this.httpClient.send(request, BodyHandlers.ofString()); if (response.statusCode() == 200) { @@ -240,7 +245,7 @@ public void stop() { LOG.debug("", e); } } else - LOG.warn("Still ignoring KER {}.", this.remoteKerUri); + logStillIgnoring(); // if someone calls this stop method, all smart connectors should be removed // from the other knowledge base store. We do this by removing the ker details @@ -249,6 +254,16 @@ public void stop() { dispatcher.notifySmartConnectorsChanged(); } + /** + * To prevent many "Still ignoring" messages, we only log them once a minute. + */ + private void logStillIgnoring() { + if (logStillIgnoringAfter == null || logStillIgnoringAfter.isBefore(LocalDateTime.now())) { + LOG.warn("Still ignoring KER {}.", this.remoteKerUri); + logStillIgnoringAfter = LocalDateTime.now().plusMinutes(1); + } + } + public void sendToRemoteSmartConnector(KnowledgeMessage message) throws IOException { assert (getRemoteKerDetails() == null ? true : getRemoteKerDetails().getSmartConnectorIds().contains(message.getToKnowledgeBase().toString())); @@ -259,8 +274,8 @@ public void sendToRemoteSmartConnector(KnowledgeMessage message) throws IOExcept String jsonMessage = objectMapper.writeValueAsString(MessageConverter.toJson(message)); HttpRequest request = HttpRequest .newBuilder(new URI(this.remoteKerUri + getPathForMessageType(message))) - .header("Content-Type", "application/json").version(Version.HTTP_1_1) - .POST(BodyPublishers.ofString(jsonMessage)).build(); + .header("Content-Type", "application/json").timeout(Duration.ofSeconds(HTTP_TIMEOUT)) + .version(Version.HTTP_1_1).POST(BodyPublishers.ofString(jsonMessage)).build(); HttpResponse response = this.httpClient.send(request, BodyHandlers.ofString()); @@ -285,7 +300,7 @@ public void sendToRemoteSmartConnector(KnowledgeMessage message) throws IOExcept throw e; } } else { - LOG.warn("Still ignoring KER {}.", this.remoteKerUri); + logStillIgnoring(); throw new IOException("KER " + this.remoteKerUri + " is currently unavailable. Trying again later."); } } @@ -295,28 +310,28 @@ public void sendMyKerDetailsToPeer(KnowledgeEngineRuntimeDetails details) { try { String jsonMessage = objectMapper.writeValueAsString(details); HttpRequest request = HttpRequest.newBuilder(new URI(this.remoteKerUri + "/runtimedetails")) - .header("Content-Type", "application/json").version(Version.HTTP_1_1) - .POST(BodyPublishers.ofString(jsonMessage)).build(); + .header("Content-Type", "application/json").timeout(Duration.ofSeconds(HTTP_TIMEOUT)) + .version(Version.HTTP_1_1).POST(BodyPublishers.ofString(jsonMessage)).build(); HttpResponse response = this.httpClient.send(request, BodyHandlers.ofString()); if (response.statusCode() == 200) { this.noError(); LOG.trace("Successfully sent updated KnowledgeEngineRuntimeDetails to {}", this.remoteKerUri); } else { - this.remoteKerDetails = null; - this.errorOccurred(); - LOG.warn("Failed to send updated KnowledgeEngineRuntimeDetails to {}, got response {}: {}", - this.remoteKerUri, response.statusCode(), response.body()); + int time = this.errorOccurred(); + LOG.warn( + "Ignoring KER {} for {} minutes. Failed to send updated KnowledgeEngineRuntimeDetails, got response {}: {}", + this.remoteKerUri, time, response.statusCode(), response.body()); } } catch (IOException | URISyntaxException | InterruptedException e) { - this.remoteKerDetails = null; - this.errorOccurred(); - LOG.warn("Failed to send updated KnowledgeEngineRuntimeDetails to " - + remoteKerConnectionDetails.getId()); + int time = this.errorOccurred(); + LOG.warn( + "Ignoring KER {} for {} minutes. Failed to send updated KnowledgeEngineRuntimeDetails due to '{}'", + this.remoteKerUri, time, e.getMessage()); LOG.debug("", e); } } else - LOG.warn("Still ignoring KER {}.", this.remoteKerUri); + logStillIgnoring(); } private String getPathForMessageType(KnowledgeMessage message) { From 3b61c22afc8942de2155c80a7b9fc40ea9f59e57 Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Tue, 13 Feb 2024 11:15:48 +0100 Subject: [PATCH 07/19] Remove all smart connectors when KER is unavailable. --- .../runtime/messaging/RemoteKerConnection.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java index 0f548838a..e11977f88 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java @@ -237,10 +237,12 @@ public void stop() { if (response.statusCode() == 200) { LOG.trace("Successfully said goodbye to {}", this.remoteKerUri); } else { + this.remoteKerDetails = null; LOG.warn("Failed to say goodbye to {}, got response {}: {}", this.remoteKerUri, response.statusCode(), response.body()); } } catch (IOException | URISyntaxException | InterruptedException e) { + this.remoteKerDetails = null; LOG.warn("Failed to say goodbye to " + remoteKerConnectionDetails.getId()); LOG.debug("", e); } @@ -283,6 +285,7 @@ public void sendToRemoteSmartConnector(KnowledgeMessage message) throws IOExcept this.noError(); LOG.trace("Successfully sent message {} to {}", message.getMessageId(), this.remoteKerUri); } else { + this.remoteKerDetails = null; int time = this.errorOccurred(); LOG.warn("Ignoring KER {} for {} minutes. Failed to send message {} to {}, got response {}: {}", this.remoteKerUri, time, message.getMessageId(), this.remoteKerUri, response.statusCode(), @@ -291,10 +294,12 @@ public void sendToRemoteSmartConnector(KnowledgeMessage message) throws IOExcept + ", body " + response.body()); } } catch (JsonProcessingException | URISyntaxException | InterruptedException e) { + this.remoteKerDetails = null; int time = this.errorOccurred(); LOG.warn("Ignoring KER {} for {} minutes.", this.remoteKerUri, time); throw new IOException("Could not send message to remote SmartConnector.", e); } catch (IOException e) { + this.remoteKerDetails = null; int time = this.errorOccurred(); LOG.warn("Ignoring KER {} for {} minutes.", this.remoteKerUri, time); throw e; @@ -318,12 +323,14 @@ public void sendMyKerDetailsToPeer(KnowledgeEngineRuntimeDetails details) { this.noError(); LOG.trace("Successfully sent updated KnowledgeEngineRuntimeDetails to {}", this.remoteKerUri); } else { + this.remoteKerDetails = null; int time = this.errorOccurred(); LOG.warn( "Ignoring KER {} for {} minutes. Failed to send updated KnowledgeEngineRuntimeDetails, got response {}: {}", this.remoteKerUri, time, response.statusCode(), response.body()); } } catch (IOException | URISyntaxException | InterruptedException e) { + this.remoteKerDetails = null; int time = this.errorOccurred(); LOG.warn( "Ignoring KER {} for {} minutes. Failed to send updated KnowledgeEngineRuntimeDetails due to '{}'", From af6f4cc4eff45b4024a4cdf448b4a5589547f423 Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Tue, 13 Feb 2024 17:36:15 +0100 Subject: [PATCH 08/19] notify everyone when KER becomes unavailable. --- .../runtime/messaging/RemoteKerConnection.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java index e11977f88..024be4321 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java @@ -293,16 +293,12 @@ public void sendToRemoteSmartConnector(KnowledgeMessage message) throws IOExcept throw new IOException("Message not accepted by remote host, status code " + response.statusCode() + ", body " + response.body()); } - } catch (JsonProcessingException | URISyntaxException | InterruptedException e) { + } catch (URISyntaxException | InterruptedException | IOException e) { this.remoteKerDetails = null; int time = this.errorOccurred(); LOG.warn("Ignoring KER {} for {} minutes.", this.remoteKerUri, time); - throw new IOException("Could not send message to remote SmartConnector.", e); - } catch (IOException e) { - this.remoteKerDetails = null; - int time = this.errorOccurred(); - LOG.warn("Ignoring KER {} for {} minutes.", this.remoteKerUri, time); - throw e; + this.dispatcher.notifySmartConnectorsChanged(); + throw new IOException(e); } } else { logStillIgnoring(); From f5f9c5f6ffddc81003546dcdc89abbe1fc3567f6 Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Fri, 16 Feb 2024 15:27:34 +0100 Subject: [PATCH 09/19] Improved the timeouts of the KE. Done: - introduced configurable timeout for how long to wait for other KBs - when a KER becomes unavailable, its SCs are no longer taken into account when exchanging data. - when querying the KD, we now also manage unavailable KERs to make sure when they come available again, they are taken into account again. - included the new configuration ENVIRONMENT VARAIBLE in the readme. --- README.md | 3 ++ .../impl/MessageRouterImpl.java | 34 ++++++++++++++++--- .../messaging/RemoteKerConnection.java | 7 ++-- .../messaging/RemoteKerConnectionManager.java | 18 +++++++++- 4 files changed, 54 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 5247f22ca..96cdb41b9 100644 --- a/README.md +++ b/README.md @@ -257,3 +257,6 @@ nohup java -cp "smart-connector-rest-dist-1.2.3.jar:dependency/*" eu.knowledge.e ### Starting the Knowledge Engine in distributed mode The Knowledge Engine can also start in distributed mode, where it connects with a remote knowledge directory and where different instances of the Knowledge Engine (each instance hosting one or more smart connectors) can communicate with each other. More information about starting the Knowledge Engine in distributed mode can be found in the [documentation](docs/04_distributed_mode.md). + +### Increasing the wait time for other KBs to respond +By default, a Smart Connector waits `10` seconds max for a reply from another Smart Connector when sending an ASK/POST message. This time is configurable via the `KE_KB_WAIT_TIMEOUT` environment variable and setting it to `0` means the Smart Connector will wait indefinitely (this can be useful when dealing with Human KBs). diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java index 84af1a619..519afc106 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java @@ -7,7 +7,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -30,7 +29,13 @@ public class MessageRouterImpl implements MessageRouter, SmartConnectorEndpoint */ private static final int MAX_ENTRIES = 5000; - private static final int WAIT_TIMEOUT = 30; + /** + * How many seconds should the MessageRouter wait for ANSWER/REACT Message when + * sending a ASK/POST Message? 0 means wait forever (useful when working with a + * human KB) + */ + private static final String CONF_KEY_WAIT_TIMEOUT = "KE_KB_WAIT_TIMEOUT"; + private static final int DEFAULT_WAIT_TIMEOUT = 10; private final SmartConnectorImpl smartConnector; private final Map> openAskMessages = Collections @@ -80,6 +85,10 @@ public MessageRouterImpl(SmartConnectorImpl smartConnector) { this.smartConnector = smartConnector; } + private int getWaitTimeout() { + return Integer.parseInt(this.getConfigProperty(CONF_KEY_WAIT_TIMEOUT, Integer.toString(DEFAULT_WAIT_TIMEOUT))); + } + @Override public CompletableFuture sendAskMessage(AskMessage askMessage) throws IOException { MessageDispatcherEndpoint messageDispatcher = this.messageDispatcherEndpoint; @@ -89,7 +98,7 @@ public CompletableFuture sendAskMessage(AskMessage askMessage) th CompletableFuture future = new CompletableFuture<>(); // wait maximally WAIT_TIMEOUT for a return message. - future.orTimeout(WAIT_TIMEOUT, TimeUnit.SECONDS).whenComplete((m, e) -> { + future.orTimeout(getWaitTimeout(), TimeUnit.SECONDS).whenComplete((m, e) -> { this.openAskMessages.remove(askMessage.getMessageId()); }); @@ -110,7 +119,7 @@ public CompletableFuture sendPostMessage(PostMessage postMessage) CompletableFuture future = new CompletableFuture<>(); // wait maximally WAIT_TIMEOUT for a return message. - future.orTimeout(WAIT_TIMEOUT, TimeUnit.SECONDS).whenComplete((m, e) -> { + future.orTimeout(getWaitTimeout(), TimeUnit.SECONDS).whenComplete((m, e) -> { this.openAskMessages.remove(postMessage.getMessageId()); }); @@ -139,7 +148,7 @@ public void handleAskMessage(AskMessage message) { try { messageDispatcher.send(reply); } catch (Throwable e) { - this.LOG.warn("Could not send reply to message " + message.getMessageId()); + this.LOG.warn("Could not send reply to message " + message.getMessageId() + ": " + e.getMessage()); this.LOG.debug("", e); } }).handle((r, e) -> { @@ -261,6 +270,21 @@ public URI getKnowledgeBaseId() { return this.smartConnector.getKnowledgeBaseId(); } + public String getConfigProperty(String key, String defaultValue) { + // We might replace this with something a bit more fancy in the future... + String value = System.getenv(key); + if (value == null) { + value = defaultValue; + LOG.trace("No value for the configuration parameter '{}' was provided, using the default value '{}'", key, + defaultValue); + } + return value; + } + + public boolean hasConfigProperty(String key) { + return System.getenv(key) != null; + } + @Override public void setMessageDispatcher(MessageDispatcherEndpoint messageDispatcherEndpoint) { assert this.messageDispatcherEndpoint == null; diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java index 024be4321..24d64d403 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java @@ -46,7 +46,7 @@ public class RemoteKerConnection { /** * A maximum amount of time to wait for othe HTTP REST call to fail/succeed. */ - private static final int HTTP_TIMEOUT = 10; + private static final int HTTP_TIMEOUT = 5; public static final Logger LOG = LoggerFactory.getLogger(RemoteKerConnection.class); @@ -151,7 +151,7 @@ private void updateRemoteKerDataFromPeer() { dispatcher.notifySmartConnectorsChanged(); } - private boolean isAvailable() { + public boolean isAvailable() { if (tryAgainAfter != null) { boolean after = LocalDateTime.now().isAfter(tryAgainAfter); if (after) { @@ -290,6 +290,7 @@ public void sendToRemoteSmartConnector(KnowledgeMessage message) throws IOExcept LOG.warn("Ignoring KER {} for {} minutes. Failed to send message {} to {}, got response {}: {}", this.remoteKerUri, time, message.getMessageId(), this.remoteKerUri, response.statusCode(), response.body()); + this.dispatcher.notifySmartConnectorsChanged(); throw new IOException("Message not accepted by remote host, status code " + response.statusCode() + ", body " + response.body()); } @@ -321,6 +322,7 @@ public void sendMyKerDetailsToPeer(KnowledgeEngineRuntimeDetails details) { } else { this.remoteKerDetails = null; int time = this.errorOccurred(); + this.dispatcher.notifySmartConnectorsChanged(); LOG.warn( "Ignoring KER {} for {} minutes. Failed to send updated KnowledgeEngineRuntimeDetails, got response {}: {}", this.remoteKerUri, time, response.statusCode(), response.body()); @@ -328,6 +330,7 @@ public void sendMyKerDetailsToPeer(KnowledgeEngineRuntimeDetails details) { } catch (IOException | URISyntaxException | InterruptedException e) { this.remoteKerDetails = null; int time = this.errorOccurred(); + this.dispatcher.notifySmartConnectorsChanged(); LOG.warn( "Ignoring KER {} for {} minutes. Failed to send updated KnowledgeEngineRuntimeDetails due to '{}'", this.remoteKerUri, time, e.getMessage()); diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnectionManager.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnectionManager.java index e750927ee..94b61f04c 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnectionManager.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnectionManager.java @@ -40,6 +40,7 @@ public class RemoteKerConnectionManager extends SmartConnectorManagementApiServi private static final int KNOWLEDGE_DIRECTORY_UPDATE_COOLDOWN = 2; private final RemoteMessageReceiver messageReceiver; private final Map remoteKerConnections = new ConcurrentHashMap<>(); + private final Map unavailableRemoteKerConnections = new ConcurrentHashMap<>(); private ScheduledFuture scheduledScheduleFuture; private ScheduledFuture scheduledKnowledgeDirectoryQueryFuture; private final MessageDispatcher messageDispatcher; @@ -78,7 +79,7 @@ public void scheduleQueryKnowledgeDirectory() { LOG.debug("Scheduling to query the Knowledge Directory right away."); this.scheduledKnowledgeDirectoryQueryFuture = KeRuntime.executorService().schedule(() -> { try { - queryKnowledgeDirectory(); + queryKnowledgeDirectory(); } catch (Throwable t) { LOG.error("", t); } @@ -123,11 +124,26 @@ private synchronized void queryKnowledgeDirectory() { .hasNext();) { Entry e = it.next(); + // deal with unavailable remote kers + if (e.getValue().isAvailable() && this.unavailableRemoteKerConnections.containsKey(e.getKey())) { + // available again so make sure we get its current SCs + this.unavailableRemoteKerConnections.remove(e.getKey()); + e.getValue().getRemoteKerDetails(); + } + + if (!e.getValue().isAvailable()) { + if (!this.unavailableRemoteKerConnections.containsKey(e.getKey())) { + // recently became unavailable + this.unavailableRemoteKerConnections.put(e.getKey(), e.getValue()); + } + } + if (!kerIds.contains(e.getKey())) { // According the the Knowledge Directory, this KER doesn't exist (anymore) LOG.info("Removing peer that is now gone: {}", e.getValue().getRemoteKerUri()); e.getValue().stop(); it.remove(); + this.unavailableRemoteKerConnections.remove(e.getKey()); } } this.knowledgeDirectoryUpdateCooldownEnds = new Date( From 9c21fbdf5ef40948d5c792538c13f55fe2de30f8 Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Fri, 16 Feb 2024 16:12:14 +0100 Subject: [PATCH 10/19] Fix unit test. --- smart-connector-rest-dist/pom.xml | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/smart-connector-rest-dist/pom.xml b/smart-connector-rest-dist/pom.xml index 2790e08d6..d3f747e0d 100644 --- a/smart-connector-rest-dist/pom.xml +++ b/smart-connector-rest-dist/pom.xml @@ -25,15 +25,20 @@ org.junit.jupiter junit-jupiter-api - 5.10.1 + 5.10.2 test org.junit.jupiter junit-jupiter-engine - 5.7.0 + 5.10.2 test + + org.junit.jupiter + junit-jupiter + 5.10.2 + From 06acdb3d75c899c26faddbda290c8b4ae4c74aa3 Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Tue, 20 Feb 2024 13:06:56 +0100 Subject: [PATCH 11/19] Fix ordering issue when starting two runtimes. Issue was: - when we first start our communication, other KERs do not know our KBs yet when we send them a message to announce ourselves. - when we first let other KERs know, our communication is not yet started when we receive (and want to reply) to messages from other KERs. The current version is that we do not immediately send a message to another KER when it contacts us, but we wait for it to announce itself. However, not sure whether this actually works in all scenario's, so we need to test this further or think about it further. --- examples/unreachable-runtimes/readme.md | 2 +- .../api/impl/SmartConnectorLifeCycleApiServiceImpl.java | 6 +++--- .../engine/smartconnector/impl/SmartConnectorImpl.java | 9 ++++++--- .../messaging/LocalSmartConnectorConnectionManager.java | 4 ++-- .../runtime/messaging/RemoteKerConnection.java | 2 +- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/examples/unreachable-runtimes/readme.md b/examples/unreachable-runtimes/readme.md index 118321c23..aa4c9a52f 100644 --- a/examples/unreachable-runtimes/readme.md +++ b/examples/unreachable-runtimes/readme.md @@ -15,7 +15,7 @@ Retrieve the internal IP address of the KB3 (because it needs to always be able Make sure runtime-3 is configured to switch between being reachable to being unreachable. First open a shell for runtime-3. ``` -docker compose exec runtime-3 bash +docker compose exec runtime-3 sh ``` Configure `iptables-legacy` to allow the following packets to go through when we block incoming traffic: diff --git a/smart-connector-rest-server/src/main/java/eu/knowledge/engine/rest/api/impl/SmartConnectorLifeCycleApiServiceImpl.java b/smart-connector-rest-server/src/main/java/eu/knowledge/engine/rest/api/impl/SmartConnectorLifeCycleApiServiceImpl.java index da24fa829..29d73bcc5 100644 --- a/smart-connector-rest-server/src/main/java/eu/knowledge/engine/rest/api/impl/SmartConnectorLifeCycleApiServiceImpl.java +++ b/smart-connector-rest-server/src/main/java/eu/knowledge/engine/rest/api/impl/SmartConnectorLifeCycleApiServiceImpl.java @@ -145,7 +145,9 @@ public void scPost(@Parameter(description = "", required = true) @NotNull @Valid final boolean reasonerEnabled = smartConnector.getReasonerEnabled() == null ? false : smartConnector.getReasonerEnabled(); - + + LOG.info("Creating smart connector with ID {}.", kbId); + // Tell the manager to create a KB, store it, and have it set up a SC etc. this.manager.createKB(new SmartConnector().knowledgeBaseId(kbId.toString()).knowledgeBaseName(kbName) .knowledgeBaseDescription(kbDescription).leaseRenewalTime(smartConnector.getLeaseRenewalTime()) @@ -154,8 +156,6 @@ public void scPost(@Parameter(description = "", required = true) @NotNull @Valid asyncResponse.resume(Response.ok().build()); }); - LOG.info("Creating smart connector with ID {}.", kbId); - return; } diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/SmartConnectorImpl.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/SmartConnectorImpl.java index b8b5f63b4..e7a30e358 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/SmartConnectorImpl.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/SmartConnectorImpl.java @@ -480,14 +480,14 @@ void communicationReady() { Instant beforePopulate = Instant.now(); LOG.info("Getting comms ready took {} ms", Duration.between(this.started, beforePopulate).toMillis()); // Populate the initial knowledge base store. - this.otherKnowledgeBaseStore.populate().thenRun(() -> { + this.otherKnowledgeBaseStore.populate().handle((r, e) -> { LOG.info("Populating took {} ms", Duration.between(beforePopulate, Instant.now()).toMillis()); Instant beforeAnnounce = Instant.now(); // Then tell the other knowledge bases about our existence. - this.metaKnowledgeBase.postNewKnowledgeBase().thenRun(() -> { + this.metaKnowledgeBase.postNewKnowledgeBase().handle((r2, e2) -> { LOG.info("Announcing took {} ms", Duration.between(beforeAnnounce, Instant.now()).toMillis()); Instant beforeConstructorFinished = Instant.now(); - this.constructorFinished.thenRun(() -> { + this.constructorFinished.handle((r3, e3) -> { LOG.info("Constructor finished took {} ms", Duration.between(beforeConstructorFinished, Instant.now()).toMillis()); // When that is done, and all peers have acknowledged our existence, we @@ -500,8 +500,11 @@ void communicationReady() { LOG.error("KnowledgeBase threw exception", t); } }); + return (Void) null; }); + return (Void) null; }); + return (Void) null; }).exceptionally((Throwable t) -> { LOG.error("Populating the Smart Connector should not result in errors.", t); return null; diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/LocalSmartConnectorConnectionManager.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/LocalSmartConnectorConnectionManager.java index 9e595e37c..b269dff0a 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/LocalSmartConnectorConnectionManager.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/LocalSmartConnectorConnectionManager.java @@ -52,11 +52,11 @@ public void smartConnectorAdded(RuntimeSmartConnector smartConnector) { SmartConnectorEndpoint endpoint = smartConnector.getSmartConnectorEndpoint(); LocalSmartConnectorConnection connection = new LocalSmartConnectorConnection(messageDispatcher, endpoint); this.localSmartConnectorConnections.put(endpoint.getKnowledgeBaseId(), connection); - connection.start(); if (messageDispatcher.runsInDistributedMode()) { this.messageDispatcher.getRemoteSmartConnectorConnectionsManager().notifyChangedLocalSmartConnectors(); - this.messageDispatcher.notifySmartConnectorsChanged(); +// this.messageDispatcher.notifySmartConnectorsChanged(); } + connection.start(); } // Remove the LocalSmartConnectorMessageReceiver and detach it diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java index 24d64d403..bbc7ade1c 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java @@ -217,7 +217,7 @@ public boolean representsKnowledgeBase(URI knowledgeBaseId) { public void updateKerDetails(KnowledgeEngineRuntimeDetails kerDetails) { // TODO implement checks? this.remoteKerDetails = kerDetails; - dispatcher.notifySmartConnectorsChanged(); +// dispatcher.notifySmartConnectorsChanged(); } public void start() { From 35c6cace85b994c9ebb290468d788d134aa9fb5b Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Tue, 20 Feb 2024 14:08:08 +0100 Subject: [PATCH 12/19] Check whether the next test already before previous is finished. --- .../api/TestRegisterKnowledgeInteraction.java | 36 +++++++------------ 1 file changed, 12 insertions(+), 24 deletions(-) diff --git a/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/api/TestRegisterKnowledgeInteraction.java b/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/api/TestRegisterKnowledgeInteraction.java index 01804aab8..b36cd8514 100644 --- a/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/api/TestRegisterKnowledgeInteraction.java +++ b/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/api/TestRegisterKnowledgeInteraction.java @@ -6,6 +6,7 @@ import java.net.URL; import java.util.Map; +import org.apache.jena.atlas.logging.Log; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Disabled; @@ -30,40 +31,27 @@ public void setUpServer() { public void testRegisterKi() throws IOException { URL url = new URL("http://localhost:" + PORT + "/rest"); - HttpTester registerKb = new HttpTester(new URL(url + "/sc"), "POST", "{\"knowledgeBaseId\": \"http://example.com/kb\", \"knowledgeBaseName\": \"KB\", \"knowledgeBaseDescription\": \"KB\"}", Map.of( - "Content-Type", "application/json", - "Accept", "*/*" - )); + HttpTester registerKb = new HttpTester(new URL(url + "/sc"), "POST", + "{\"knowledgeBaseId\": \"http://example.com/kb\", \"knowledgeBaseName\": \"KB\", \"knowledgeBaseDescription\": \"KB\"}", + Map.of("Content-Type", "application/json", "Accept", "*/*")); registerKb.expectStatus(200); - HttpTester registerKiWithName = new HttpTester( - new URL(url + "/sc/ki"), - "POST", - "{\"knowledgeInteractionType\": \"AskKnowledgeInteraction\", \"knowledgeInteractionName\": \"some-name\", \"graphPattern\": \"?a ?b ?c.\"}", - Map.of( - "Knowledge-Base-Id", "http://example.com/kb", - "Content-Type", "application/json", - "Accept", "*/*" - ) - ); + HttpTester registerKiWithName = new HttpTester(new URL(url + "/sc/ki"), "POST", + "{\"knowledgeInteractionType\": \"AskKnowledgeInteraction\", \"knowledgeInteractionName\": \"some-name\", \"graphPattern\": \"?a ?b ?c.\"}", + Map.of("Knowledge-Base-Id", "http://example.com/kb", "Content-Type", "application/json", "Accept", + "*/*")); registerKiWithName.expectStatus(200); - HttpTester getKiWithName = new HttpTester( - new URL(url + "/sc/ki"), - "GET", - null, - Map.of( - "Knowledge-Base-Id", "http://example.com/kb", - "Content-Type", "application/json", - "Accept", "*/*" - ) - ); + HttpTester getKiWithName = new HttpTester(new URL(url + "/sc/ki"), "GET", null, Map.of("Knowledge-Base-Id", + "http://example.com/kb", "Content-Type", "application/json", "Accept", "*/*")); var body = getKiWithName.getBody(); assertTrue(body.contains("\"http://example.com/kb/interaction/some-name\"")); } @AfterAll public void cleanUp() { + System.out.println("Start clean up!"); rsh.cleanUp(); + System.out.println("End clean up!"); } } From a1231a35ce352c989f5b39aa96452d5583b00df8 Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Tue, 20 Feb 2024 14:24:35 +0100 Subject: [PATCH 13/19] Make sure our created thread is dead before the main thread continues. --- .../knowledge/engine/rest/RestServerHelper.java | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/RestServerHelper.java b/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/RestServerHelper.java index e3bf04e68..2f42b011e 100644 --- a/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/RestServerHelper.java +++ b/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/RestServerHelper.java @@ -11,14 +11,14 @@ public class RestServerHelper { private static final Logger LOG = LoggerFactory.getLogger(RestServerHelper.class); private static int WAIT_BEFORE_NEXT_POLL = 300; - + private Thread thread; public void start(int port) { var r = new Runnable() { @Override public void run() { - RestServer.main(new String[] {String.format("%d", port)}); + RestServer.main(new String[] { String.format("%d", port) }); } }; this.thread = new Thread(r); @@ -34,15 +34,16 @@ public void run() { } } - public void cleanUp() { + public void cleanUp() throws InterruptedException { thread.interrupt(); + thread.join(); } private static boolean portAvailable(int port) { - try (Socket ignored = new Socket("localhost", port)) { - return false; - } catch (IOException ignored) { - return true; - } + try (Socket ignored = new Socket("localhost", port)) { + return false; + } catch (IOException ignored) { + return true; + } } } From a576deca27d634b5f3c3ad058bcec68cd8962b4b Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Tue, 20 Feb 2024 14:30:26 +0100 Subject: [PATCH 14/19] Possibly fix the build bug. --- .../java/eu/knowledge/engine/rest/RestServerHelper.java | 8 ++++++-- .../engine/rest/api/TestRegisterKnowledgeInteraction.java | 2 -- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/RestServerHelper.java b/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/RestServerHelper.java index 2f42b011e..253ced315 100644 --- a/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/RestServerHelper.java +++ b/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/RestServerHelper.java @@ -34,9 +34,13 @@ public void run() { } } - public void cleanUp() throws InterruptedException { + public void cleanUp() { thread.interrupt(); - thread.join(); + try { + thread.join(); + } catch (InterruptedException e) { + LOG.info("Failed to join thread."); + } } private static boolean portAvailable(int port) { diff --git a/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/api/TestRegisterKnowledgeInteraction.java b/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/api/TestRegisterKnowledgeInteraction.java index b36cd8514..2d2b5c228 100644 --- a/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/api/TestRegisterKnowledgeInteraction.java +++ b/smart-connector-rest-dist/src/test/java/eu/knowledge/engine/rest/api/TestRegisterKnowledgeInteraction.java @@ -50,8 +50,6 @@ public void testRegisterKi() throws IOException { @AfterAll public void cleanUp() { - System.out.println("Start clean up!"); rsh.cleanUp(); - System.out.println("End clean up!"); } } From fec30d065943347fe7b48e170a15995269889e48 Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Fri, 23 Feb 2024 14:00:36 +0100 Subject: [PATCH 15/19] Add configuration properties. Also: - remove some unnecessary http configuration. --- README.md | 9 +++- examples/unreachable-runtimes/readme.md | 2 +- .../impl/MessageRouterImpl.java | 14 +++++- .../messaging/RemoteKerConnection.java | 43 +++++++++++++------ 4 files changed, 51 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 96cdb41b9..097ed8f1a 100644 --- a/README.md +++ b/README.md @@ -258,5 +258,12 @@ nohup java -cp "smart-connector-rest-dist-1.2.3.jar:dependency/*" eu.knowledge.e ### Starting the Knowledge Engine in distributed mode The Knowledge Engine can also start in distributed mode, where it connects with a remote knowledge directory and where different instances of the Knowledge Engine (each instance hosting one or more smart connectors) can communicate with each other. More information about starting the Knowledge Engine in distributed mode can be found in the [documentation](docs/04_distributed_mode.md). -### Increasing the wait time for other KBs to respond +### Additional configuration environment variables + +*Increasing the wait time for other KBs to respond* + By default, a Smart Connector waits `10` seconds max for a reply from another Smart Connector when sending an ASK/POST message. This time is configurable via the `KE_KB_WAIT_TIMEOUT` environment variable and setting it to `0` means the Smart Connector will wait indefinitely (this can be useful when dealing with Human KBs). + +*Increasing the HTTP timeouts* + +By default, a KER waits `5` seconds max for a HTTP response from another KER when sending a message via the inter-KER protocol. The time is configurable via the `KE_HTTP_TIMEOUT` environment variable. \ No newline at end of file diff --git a/examples/unreachable-runtimes/readme.md b/examples/unreachable-runtimes/readme.md index aa4c9a52f..130f1cd8f 100644 --- a/examples/unreachable-runtimes/readme.md +++ b/examples/unreachable-runtimes/readme.md @@ -1,7 +1,7 @@ ## Knowledge Engine's distributed mode test This docker compose project is used to test the Knowledge Engine's behavior in distributed mode when something exceptional happens (i.e. divergence from the happy flow). For example, one participant in the Knowledge Network configured its KER incorrectly and therefore it can reach out, but no one can contact the KER from the outside (via the Inter-KER protocol). Under such circumstances, we want the Knowledge Engine to keep functioning and behave as normal as possible. -To test this, we setup a distributed KER environment with 3 KER+KB combis that exchange data. We have `runtime-1+kb1`, `runtime-2+kb2` and `runtime-3+kb3`. By using the `iptables` tool for `runtime-3` we can simulate a misconfigured KER and test how the other Knowledge Engines behave. Use the following instructions to simulate the misconfigured KER. +To test this, we setup a distributed KER environment with 3 KER+KB combis that exchange data. We have `runtime-1+kb1`, `runtime-2+kb2` and `runtime-3+kb3`. By using the `iptables` tool for `runtime-3` we can simulate a misconfigured KER and test how the other Knowledge Engines behave. Use the following instructions to simulate the misconfigured KER. In the future we might want to use [Awall](https://github.com/alpinelinux/awall) instead of `iptables`. Start the docker compose project: `docker compose up -d` diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java index 519afc106..62d7152ab 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/MessageRouterImpl.java @@ -98,7 +98,12 @@ public CompletableFuture sendAskMessage(AskMessage askMessage) th CompletableFuture future = new CompletableFuture<>(); // wait maximally WAIT_TIMEOUT for a return message. - future.orTimeout(getWaitTimeout(), TimeUnit.SECONDS).whenComplete((m, e) -> { + int waitInSeconds = this.getWaitTimeout(); + if (waitInSeconds > 0) { + future = future.orTimeout(waitInSeconds, TimeUnit.SECONDS); + } + + future.whenComplete((m, e) -> { this.openAskMessages.remove(askMessage.getMessageId()); }); @@ -119,7 +124,12 @@ public CompletableFuture sendPostMessage(PostMessage postMessage) CompletableFuture future = new CompletableFuture<>(); // wait maximally WAIT_TIMEOUT for a return message. - future.orTimeout(getWaitTimeout(), TimeUnit.SECONDS).whenComplete((m, e) -> { + int waitInSeconds = this.getWaitTimeout(); + if (waitInSeconds > 0) { + future = future.orTimeout(waitInSeconds, TimeUnit.SECONDS); + } + + future.whenComplete((m, e) -> { this.openAskMessages.remove(postMessage.getMessageId()); }); diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java index bbc7ade1c..aab811863 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java @@ -8,7 +8,6 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.http.HttpClient; -import java.net.http.HttpClient.Version; import java.net.http.HttpRequest; import java.net.http.HttpRequest.BodyPublishers; import java.net.http.HttpResponse; @@ -21,7 +20,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; @@ -44,9 +42,11 @@ public class RemoteKerConnection { /** - * A maximum amount of time to wait for othe HTTP REST call to fail/succeed. + * How many seconds the HttpClient waits for a HTTP response when sending a HTTP + * request. Default 5 seconds. */ - private static final int HTTP_TIMEOUT = 5; + private static final String CONF_KEY_HTTP_TIMEOUT = "KE_HTTP_TIMEOUT"; + private static final int DEFAULT_HTTP_TIMEOUT = 5; public static final Logger LOG = LoggerFactory.getLogger(RemoteKerConnection.class); @@ -90,13 +90,19 @@ protected PasswordAuthentication getPasswordAuthentication() { this.remoteKerUri = kerConnectionDetails.getExposedUrl(); } - this.httpClient = builder.connectTimeout(Duration.ofSeconds(HTTP_TIMEOUT)).build(); + int httpTimeout = getHttpTimeout(); + + this.httpClient = builder.connectTimeout(Duration.ofSeconds(httpTimeout)).build(); objectMapper = new ObjectMapper().disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) .disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS).findAndRegisterModules() .setDateFormat(new RFC3339DateFormat()); } + private int getHttpTimeout() { + return Integer.parseInt(this.getConfigProperty(CONF_KEY_HTTP_TIMEOUT, Integer.toString(DEFAULT_HTTP_TIMEOUT))); + } + public URI getRemoteKerUri() { return this.remoteKerUri; } @@ -121,8 +127,7 @@ private int errorOccurred() { private void updateRemoteKerDataFromPeer() { try { HttpRequest request = HttpRequest.newBuilder(new URI(this.remoteKerUri + "/runtimedetails")) - .header("Content-Type", "application/json").timeout(Duration.ofSeconds(HTTP_TIMEOUT)) - .version(Version.HTTP_1_1).GET().build(); + .header("Content-Type", "application/json").GET().build(); HttpResponse response = this.httpClient.send(request, BodyHandlers.ofString()); if (response.statusCode() == 200) { @@ -230,8 +235,7 @@ public void stop() { HttpRequest request = HttpRequest .newBuilder(new URI(this.remoteKerUri + "/runtimedetails/" + dispatcher.getKnowledgeDirectoryConnectionManager().getMyKnowledgeDirectoryId())) - .header("Content-Type", "application/json").timeout(Duration.ofSeconds(HTTP_TIMEOUT)) - .version(Version.HTTP_1_1).DELETE().build(); + .header("Content-Type", "application/json").DELETE().build(); HttpResponse response = this.httpClient.send(request, BodyHandlers.ofString()); if (response.statusCode() == 200) { @@ -276,8 +280,7 @@ public void sendToRemoteSmartConnector(KnowledgeMessage message) throws IOExcept String jsonMessage = objectMapper.writeValueAsString(MessageConverter.toJson(message)); HttpRequest request = HttpRequest .newBuilder(new URI(this.remoteKerUri + getPathForMessageType(message))) - .header("Content-Type", "application/json").timeout(Duration.ofSeconds(HTTP_TIMEOUT)) - .version(Version.HTTP_1_1).POST(BodyPublishers.ofString(jsonMessage)).build(); + .header("Content-Type", "application/json").POST(BodyPublishers.ofString(jsonMessage)).build(); HttpResponse response = this.httpClient.send(request, BodyHandlers.ofString()); @@ -312,8 +315,7 @@ public void sendMyKerDetailsToPeer(KnowledgeEngineRuntimeDetails details) { try { String jsonMessage = objectMapper.writeValueAsString(details); HttpRequest request = HttpRequest.newBuilder(new URI(this.remoteKerUri + "/runtimedetails")) - .header("Content-Type", "application/json").timeout(Duration.ofSeconds(HTTP_TIMEOUT)) - .version(Version.HTTP_1_1).POST(BodyPublishers.ofString(jsonMessage)).build(); + .header("Content-Type", "application/json").POST(BodyPublishers.ofString(jsonMessage)).build(); HttpResponse response = this.httpClient.send(request, BodyHandlers.ofString()); if (response.statusCode() == 200) { @@ -356,4 +358,19 @@ private String getPathForMessageType(KnowledgeMessage message) { } } + public String getConfigProperty(String key, String defaultValue) { + // We might replace this with something a bit more fancy in the future... + String value = System.getenv(key); + if (value == null) { + value = defaultValue; + LOG.trace("No value for the configuration parameter '{}' was provided, using the default value '{}'", key, + defaultValue); + } + return value; + } + + public boolean hasConfigProperty(String key) { + return System.getenv(key) != null; + } + } From dc07a8ff13b0fc50eb1a62ea89bcbe18de8b777b Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Fri, 23 Feb 2024 14:25:51 +0100 Subject: [PATCH 16/19] Restore example by using latest docker image. --- examples/unreachable-runtimes/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/unreachable-runtimes/Dockerfile b/examples/unreachable-runtimes/Dockerfile index d1360f485..30373ce64 100644 --- a/examples/unreachable-runtimes/Dockerfile +++ b/examples/unreachable-runtimes/Dockerfile @@ -1,4 +1,4 @@ -FROM ghcr.io/tno/knowledge-engine/smart-connector:1.2.4 +FROM ghcr.io/tno/knowledge-engine/smart-connector:1.2.3 USER root From b0d815cb629eb2168eb92d43b228f26fef5f0730 Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Fri, 23 Feb 2024 14:54:59 +0100 Subject: [PATCH 17/19] Example no longer works on latest version 1.2.3. Updated version, but remember to update it to 1.2.4 when releasing. --- examples/unreachable-runtimes/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/unreachable-runtimes/Dockerfile b/examples/unreachable-runtimes/Dockerfile index 30373ce64..3375bb302 100644 --- a/examples/unreachable-runtimes/Dockerfile +++ b/examples/unreachable-runtimes/Dockerfile @@ -1,4 +1,4 @@ -FROM ghcr.io/tno/knowledge-engine/smart-connector:1.2.3 +FROM ghcr.io/tno/knowledge-engine/smart-connector:1.2.4-SNAPSHOT USER root From eadbbe4df4119fcb15b93d040a2e8b18a9ecbdcd Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Fri, 23 Feb 2024 19:48:56 +0100 Subject: [PATCH 18/19] Make SC startup more robust in distributive mode. It now waits with populating and informing other KERs until the communication is ready. Also: - fixed/improved unit tests --- admin-ui/pom.xml | 11 +++++-- knowledge-directory/pom.xml | 18 ++++++++++-- reasoner/pom.xml | 11 +++++-- smart-connector-rest-dist/pom.xml | 1 + smart-connector-rest-server/pom.xml | 10 +++++-- smart-connector/pom.xml | 10 +++++-- .../impl/SmartConnectorImpl.java | 29 ++++++++++--------- .../LocalSmartConnectorConnectionManager.java | 5 ++-- .../smartconnector/api/TimeOntologyTest.java | 9 +++++- 9 files changed, 75 insertions(+), 29 deletions(-) diff --git a/admin-ui/pom.xml b/admin-ui/pom.xml index edbbdc894..f9d97557f 100644 --- a/admin-ui/pom.xml +++ b/admin-ui/pom.xml @@ -115,15 +115,22 @@ org.junit.jupiter junit-jupiter-api - 5.10.1 + 5.10.2 test org.junit.jupiter junit-jupiter-engine - 5.7.0 + 5.10.2 test + + org.junit.jupiter + junit-jupiter + 5.10.2 + test + + eu.knowledge.engine smart-connector-rest-server diff --git a/knowledge-directory/pom.xml b/knowledge-directory/pom.xml index afcfb545f..68dc53f29 100644 --- a/knowledge-directory/pom.xml +++ b/knowledge-directory/pom.xml @@ -44,9 +44,21 @@ compile - junit - junit - ${junit-version} + org.junit.jupiter + junit-jupiter-api + 5.10.2 + test + + + org.junit.jupiter + junit-jupiter-engine + 5.10.2 + test + + + org.junit.jupiter + junit-jupiter + 5.10.2 test diff --git a/reasoner/pom.xml b/reasoner/pom.xml index c83352d45..6bb872642 100644 --- a/reasoner/pom.xml +++ b/reasoner/pom.xml @@ -19,15 +19,22 @@ org.junit.jupiter junit-jupiter-api - 5.10.1 + 5.10.2 test org.junit.jupiter junit-jupiter-engine - 5.7.0 + 5.10.2 test + + org.junit.jupiter + junit-jupiter + 5.10.2 + test + + org.slf4j diff --git a/smart-connector-rest-dist/pom.xml b/smart-connector-rest-dist/pom.xml index d3f747e0d..02c1be7fb 100644 --- a/smart-connector-rest-dist/pom.xml +++ b/smart-connector-rest-dist/pom.xml @@ -38,6 +38,7 @@ org.junit.jupiter junit-jupiter 5.10.2 + test diff --git a/smart-connector-rest-server/pom.xml b/smart-connector-rest-server/pom.xml index 0d7bf5494..eaa85b4ec 100644 --- a/smart-connector-rest-server/pom.xml +++ b/smart-connector-rest-server/pom.xml @@ -120,13 +120,19 @@ org.junit.jupiter junit-jupiter-api - 5.10.1 + 5.10.2 test org.junit.jupiter junit-jupiter-engine - 5.7.0 + 5.10.2 + test + + + org.junit.jupiter + junit-jupiter + 5.10.2 test diff --git a/smart-connector/pom.xml b/smart-connector/pom.xml index 05dd564a3..ffbdd55eb 100644 --- a/smart-connector/pom.xml +++ b/smart-connector/pom.xml @@ -32,13 +32,19 @@ org.junit.jupiter junit-jupiter-api - 5.10.1 + 5.10.2 test org.junit.jupiter junit-jupiter-engine - 5.7.0 + 5.10.2 + test + + + org.junit.jupiter + junit-jupiter + 5.10.2 test diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/SmartConnectorImpl.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/SmartConnectorImpl.java index e7a30e358..74a5efe74 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/SmartConnectorImpl.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/impl/SmartConnectorImpl.java @@ -479,17 +479,18 @@ private void checkStopped() { void communicationReady() { Instant beforePopulate = Instant.now(); LOG.info("Getting comms ready took {} ms", Duration.between(this.started, beforePopulate).toMillis()); - // Populate the initial knowledge base store. - this.otherKnowledgeBaseStore.populate().handle((r, e) -> { - LOG.info("Populating took {} ms", Duration.between(beforePopulate, Instant.now()).toMillis()); - Instant beforeAnnounce = Instant.now(); - // Then tell the other knowledge bases about our existence. - this.metaKnowledgeBase.postNewKnowledgeBase().handle((r2, e2) -> { - LOG.info("Announcing took {} ms", Duration.between(beforeAnnounce, Instant.now()).toMillis()); - Instant beforeConstructorFinished = Instant.now(); - this.constructorFinished.handle((r3, e3) -> { - LOG.info("Constructor finished took {} ms", - Duration.between(beforeConstructorFinished, Instant.now()).toMillis()); + Instant beforeConstructorFinished = Instant.now(); + this.constructorFinished.handle((r3, e3) -> { + LOG.info("Constructor finished took {} ms", + Duration.between(beforeConstructorFinished, Instant.now()).toMillis()); + // Populate the initial knowledge base store. + this.otherKnowledgeBaseStore.populate().handle((r, e) -> { + LOG.info("Populating took {} ms", Duration.between(beforePopulate, Instant.now()).toMillis()); + Instant beforeAnnounce = Instant.now(); + // Then tell the other knowledge bases about our existence. + this.metaKnowledgeBase.postNewKnowledgeBase().handle((r2, e2) -> { + LOG.info("Announcing took {} ms", Duration.between(beforeAnnounce, Instant.now()).toMillis()); + // When that is done, and all peers have acknowledged our existence, we // can proceed to inform the knowledge base that this smart connector is // ready for action! @@ -503,11 +504,11 @@ void communicationReady() { return (Void) null; }); return (Void) null; + }).exceptionally((Throwable t) -> { + LOG.error("Populating the Smart Connector should not result in errors.", t); + return null; }); return (Void) null; - }).exceptionally((Throwable t) -> { - LOG.error("Populating the Smart Connector should not result in errors.", t); - return null; }); } diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/LocalSmartConnectorConnectionManager.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/LocalSmartConnectorConnectionManager.java index b269dff0a..8f75efad1 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/LocalSmartConnectorConnectionManager.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/LocalSmartConnectorConnectionManager.java @@ -52,11 +52,11 @@ public void smartConnectorAdded(RuntimeSmartConnector smartConnector) { SmartConnectorEndpoint endpoint = smartConnector.getSmartConnectorEndpoint(); LocalSmartConnectorConnection connection = new LocalSmartConnectorConnection(messageDispatcher, endpoint); this.localSmartConnectorConnections.put(endpoint.getKnowledgeBaseId(), connection); + connection.start(); + if (messageDispatcher.runsInDistributedMode()) { this.messageDispatcher.getRemoteSmartConnectorConnectionsManager().notifyChangedLocalSmartConnectors(); -// this.messageDispatcher.notifySmartConnectorsChanged(); } - connection.start(); } // Remove the LocalSmartConnectorMessageReceiver and detach it @@ -68,7 +68,6 @@ public void smartConnectorRemoved(RuntimeSmartConnector smartConnector) { if (messageDispatcher.runsInDistributedMode()) { this.messageDispatcher.getRemoteSmartConnectorConnectionsManager().notifyChangedLocalSmartConnectors(); - this.messageDispatcher.notifySmartConnectorsChanged(); } } diff --git a/smart-connector/src/test/java/eu/knowledge/engine/smartconnector/api/TimeOntologyTest.java b/smart-connector/src/test/java/eu/knowledge/engine/smartconnector/api/TimeOntologyTest.java index 6b0989302..974486c27 100644 --- a/smart-connector/src/test/java/eu/knowledge/engine/smartconnector/api/TimeOntologyTest.java +++ b/smart-connector/src/test/java/eu/knowledge/engine/smartconnector/api/TimeOntologyTest.java @@ -1,5 +1,8 @@ package eu.knowledge.engine.smartconnector.api; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + import java.util.Calendar; import java.util.Iterator; import java.util.UUID; @@ -252,7 +255,11 @@ public void test() throws InterruptedException, ExecutionException { AskResult ar = app.ask(aAskKI, new BindingSet()).get(); - LOG.info("Bindings: {}", ar.getBindings()); + BindingSet bindings = ar.getBindings(); + LOG.info("Bindings: {}", bindings); + + assertTrue(bindings.size() > 0); + assertEquals("\"Meeting 2\"", bindings.iterator().next().get("topic")); kn.stop().get(); From 639b20708d87dd6fe1362313f5c091107620f7e4 Mon Sep 17 00:00:00 2001 From: Barry Nouwt Date: Mon, 26 Feb 2024 16:02:20 +0100 Subject: [PATCH 19/19] Restore notify when ker details change. --- .../smartconnector/runtime/messaging/RemoteKerConnection.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java index aab811863..a0d2e7ab8 100644 --- a/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java +++ b/smart-connector/src/main/java/eu/knowledge/engine/smartconnector/runtime/messaging/RemoteKerConnection.java @@ -222,7 +222,7 @@ public boolean representsKnowledgeBase(URI knowledgeBaseId) { public void updateKerDetails(KnowledgeEngineRuntimeDetails kerDetails) { // TODO implement checks? this.remoteKerDetails = kerDetails; -// dispatcher.notifySmartConnectorsChanged(); + dispatcher.notifySmartConnectorsChanged(); } public void start() {