From 79555b1008355ab91842ac4273baedb161c64ca2 Mon Sep 17 00:00:00 2001 From: kasemir Date: Wed, 2 Apr 2025 12:19:39 -0400 Subject: [PATCH 01/11] Move TCP connection to thread, fully unregister completed search --- .../org/epics/pva/client/ChannelSearch.java | 10 ++-- .../java/org/epics/pva/client/PVAClient.java | 48 +++++++++++-------- 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java index 6d8c89415f..67779903bb 100644 --- a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java +++ b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java @@ -274,11 +274,11 @@ public PVAChannel unregister(final int channel_id) if (searched != null) { logger.log(Level.FINE, () -> "Unregister search for " + searched.channel.getName() + " " + channel_id); - // NOT removing `searched` from all `search_buckets`. - // Removal would be a slow, linear operation. - // `runSearches()` will drop the channel from `search_buckets` - // because it's no longer listed in `searched_channels` - + synchronized (search_buckets) + { + for (LinkedList bucket : search_buckets) + bucket.remove(searched); + } return searched.channel; } return null; diff --git a/core/pva/src/main/java/org/epics/pva/client/PVAClient.java b/core/pva/src/main/java/org/epics/pva/client/PVAClient.java index 4f22ad140a..2e0ceb92b2 100644 --- a/core/pva/src/main/java/org/epics/pva/client/PVAClient.java +++ b/core/pva/src/main/java/org/epics/pva/client/PVAClient.java @@ -250,32 +250,40 @@ void handleSearchResponse(final int channel_id, final InetSocketAddress server, channel.setState(ClientChannelState.FOUND); logger.log(Level.FINE, () -> "Reply for " + channel + " from " + (tls ? "TLS " : "TCP ") + server + " " + guid); - final ClientTCPHandler tcp = tcp_handlers.computeIfAbsent(server, addr -> + // TCP connection can be slow, especially when blocked by firewall, so move to thread + // TODO Lightweight thread? Thread pool? + final Thread setup_tcp = new Thread(() -> { - try + final ClientTCPHandler tcp = tcp_handlers.computeIfAbsent(server, addr -> { - return new ClientTCPHandler(this, addr, guid, tls); + try + { + return new ClientTCPHandler(this, addr, guid, tls); + } + catch (Exception ex) + { + logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex); + } + return null; + }); + // In case of connection errors, tcp will be null + if (tcp == null) + { // Cannot connect to server on provided port? Likely a server or firewall problem. + // On the next search, that same server might reply and then we fail the same way on connect. + // Still, no way around re-registering the search so we succeed once the server is fixed. + search.register(channel, false /* not "now" but eventually */); } - catch (Exception ex) + else { - logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex); + if (tcp.updateGuid(guid)) + logger.log(Level.FINE, "Search-only TCP handler received GUID, now " + tcp); + + channel.registerWithServer(tcp); } - return null; }); - // In case of connection errors, tcp will be null - if (tcp == null) - { // Cannot connect to server on provided port? Likely a server or firewall problem. - // On the next search, that same server might reply and then we fail the same way on connect. - // Still, no way around re-registering the search so we succeed once the server is fixed. - search.register(channel, false /* not "now" but eventually */); - } - else - { - if (tcp.updateGuid(guid)) - logger.log(Level.FINE, "Search-only TCP handler received GUID, now " + tcp); - - channel.registerWithServer(tcp); - } + setup_tcp.setName("TCP connect " + server); + setup_tcp.setDaemon(true); + setup_tcp.start(); } /** Called by {@link ClientTCPHandler} when connection is lost or closed because unused From a9f8424956a2a0659d46b7700cdb862229710dab Mon Sep 17 00:00:00 2001 From: kasemir Date: Wed, 2 Apr 2025 12:22:07 -0400 Subject: [PATCH 02/11] align --- core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java index 67779903bb..c679430270 100644 --- a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java +++ b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java @@ -278,7 +278,7 @@ public PVAChannel unregister(final int channel_id) { for (LinkedList bucket : search_buckets) bucket.remove(searched); - } + } return searched.channel; } return null; From fb0a348d1f0b69f1f78ceb7f4a4f5ef07fba47ff Mon Sep 17 00:00:00 2001 From: kasemir Date: Thu, 3 Apr 2025 06:58:49 -0400 Subject: [PATCH 03/11] ChannelSearch bucket now Set of searched channels --- .../org/epics/pva/client/ChannelSearch.java | 42 +++++++++++++------ 1 file changed, 29 insertions(+), 13 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java index c679430270..cfc6d2b93c 100644 --- a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java +++ b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java @@ -13,9 +13,10 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; +import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -99,6 +100,22 @@ private class SearchedChannel // Otherwise run risk of getting reply without being able // to handle it } + + // Hash by channel name + @Override + public int hashCode() + { + return channel.getName().hashCode(); + } + + // Compare by channel name + @Override + public boolean equals(Object obj) + { + if (obj instanceof SearchedChannel other) + return other.channel.getName().equals(channel.getName()); + return false; + } } // SearchedChannels are tracked in two data structures @@ -119,7 +136,7 @@ private class SearchedChannel /** Search buckets * - *

The {@link #current_search_bucket} selects the list + *

The {@link #current_search_bucket} selects the set * of channels to be searched by {@link #runSearches()}, * which runs roughly once per second, each time moving to * the next search bucket in a ring buffer fashion. @@ -138,7 +155,7 @@ private class SearchedChannel *

Access to either {@link #search_buckets} or {@link #current_search_bucket} * must SYNC on {@link #search_buckets}. */ - private final ArrayList> search_buckets = new ArrayList<>(); + private final ArrayList> search_buckets = new ArrayList<>(MAX_SEARCH_PERIOD+2); /** Index of current search bucket, i.e. the one about to be searched. * @@ -188,7 +205,7 @@ public ChannelSearch(final ClientUDPHandler udp, synchronized (search_buckets) { for (int i=0; i()); + search_buckets.add(new HashSet<>()); } // Searches sent to multicast (IPv4, IPv6) or broadcast addresses (IPv4) reach every PVA server @@ -276,7 +293,7 @@ public PVAChannel unregister(final int channel_id) logger.log(Level.FINE, () -> "Unregister search for " + searched.channel.getName() + " " + channel_id); synchronized (search_buckets) { - for (LinkedList bucket : search_buckets) + for (Set bucket : search_buckets) bucket.remove(searched); } return searched.channel; @@ -301,9 +318,8 @@ public void boost() logger.log(Level.FINE, () -> "Restart search for '" + searched.channel.getName() + "'"); synchronized (search_buckets) { - final LinkedList bucket = search_buckets.get(current_search_bucket.get()); - if (! bucket.contains(searched)) - bucket.add(searched); + final Set bucket = search_buckets.get(current_search_bucket.get()); + bucket.add(searched); } } // Not sending search right now: @@ -327,12 +343,11 @@ private void runSearches() { // Determine current search bucket final int current = current_search_bucket.getAndUpdate(i -> (i + 1) % search_buckets.size()); - final LinkedList bucket = search_buckets.get(current); + final Set bucket = search_buckets.get(current); logger.log(Level.FINEST, () -> "Search bucket " + current); // Remove searched channels from the current bucket - SearchedChannel sc; - while ((sc = bucket.poll()) != null) + for (SearchedChannel sc : bucket) { if (sc.channel.getState() == ClientChannelState.SEARCHING && searched_channels.containsKey(sc.channel.getCID())) @@ -349,8 +364,8 @@ private void runSearches() // in case that search bucket is quite full final int i_n = (current + period) % search_buckets.size(); final int i_n_n = (i_n + 1) % search_buckets.size(); - final LinkedList next = search_buckets.get(i_n); - final LinkedList next_next = search_buckets.get(i_n_n); + final Set next = search_buckets.get(i_n); + final Set next_next = search_buckets.get(i_n_n); if (i_n == current || i_n_n == current) throw new IllegalStateException("Current, next and nextnext search indices for " + sc.channel + " are " + current + ", " + i_n + ", " + i_n_n); @@ -362,6 +377,7 @@ private void runSearches() else logger.log(Level.FINE, "Dropping channel from search: " + sc.channel); } + bucket.clear(); } From f1af698b7f6bc2ab2aab2d9a84236d97c7b2e0d8 Mon Sep 17 00:00:00 2001 From: kasemir Date: Thu, 3 Apr 2025 07:43:03 -0400 Subject: [PATCH 04/11] PVA client: Use `Future` This way, `tcp_handlers` can provide the `Future` without delays, while the slower TCP connection is then awaited when getting the future's value --- .../org/epics/pva/client/ChannelSearch.java | 42 +++++---- .../java/org/epics/pva/client/PVAClient.java | 88 +++++++++++++------ 2 files changed, 88 insertions(+), 42 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java index cfc6d2b93c..5366e0d4fe 100644 --- a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java +++ b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java @@ -19,6 +19,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -174,7 +175,7 @@ public boolean equals(Object obj) private final ClientUDPHandler udp; /** Create ClientTCPHandler from IP address and 'tls' flag */ - private final BiFunction tcp_provider; + private final BiFunction> tcp_provider; /** Buffer for assembling search messages */ private final ByteBuffer send_buffer = ByteBuffer.allocate(PVASettings.MAX_UDP_UNFRAGMENTED_SEND); @@ -196,7 +197,7 @@ public boolean equals(Object obj) */ public ChannelSearch(final ClientUDPHandler udp, final List udp_addresses, - final BiFunction tcp_provider, + final BiFunction> tcp_provider, final List name_server_addresses) throws Exception { this.udp = udp; @@ -453,28 +454,35 @@ private void search(final Collection channels) { // For search via TCP, do we use plain TCP or do we send the search itself via TLS? // This is configured in EPICS_PVA_NAME_SERVERS via prefix pvas:// - final ClientTCPHandler tcp = tcp_provider.apply(name_server.getAddress(), name_server.isTLS()); + final Future create_tcp = tcp_provider.apply(name_server.getAddress(), name_server.isTLS()); + final ClientTCPHandler tcp; + try + { + tcp = create_tcp.get(); + } + catch (Exception ex) + { + logger.log(Level.WARNING, "Cannot obtain TCP handler to search " + name_server, ex); + continue; + } // In case of connection errors (TCP connection blocked by firewall), // tcp will be null - if (tcp != null) + final RequestEncoder search_request = (version, buffer) -> { - final RequestEncoder search_request = (version, buffer) -> - { - logger.log(Level.FINE, () -> "Searching for " + channels + " via TCP " + tcp.getRemoteAddress()); + logger.log(Level.FINE, () -> "Searching for " + channels + " via TCP " + tcp.getRemoteAddress()); - // Search sequence identifies the potentially repeated UDP. - // TCP search is once only, so PVXS always sends 0x66696E64 = "find". - // We send "look" ("kool" for little endian). - final int seq = 0x6C6F6F6B; + // Search sequence identifies the potentially repeated UDP. + // TCP search is once only, so PVXS always sends 0x66696E64 = "find". + // We send "look" ("kool" for little endian). + final int seq = 0x6C6F6F6B; - // Use 'any' reply address since reply will be via this TCP socket - final InetSocketAddress response_address = new InetSocketAddress(0); + // Use 'any' reply address since reply will be via this TCP socket + final InetSocketAddress response_address = new InetSocketAddress(0); - SearchRequest.encode(true, seq, channels, response_address, tls , buffer); - }; - tcp.submit(search_request); - } + SearchRequest.encode(true, seq, channels, response_address, tls , buffer); + }; + tcp.submit(search_request); } // Shortcut UDP search, avoid log messages when lists are empty diff --git a/core/pva/src/main/java/org/epics/pva/client/PVAClient.java b/core/pva/src/main/java/org/epics/pva/client/PVAClient.java index 2e0ceb92b2..fda5ebee6d 100644 --- a/core/pva/src/main/java/org/epics/pva/client/PVAClient.java +++ b/core/pva/src/main/java/org/epics/pva/client/PVAClient.java @@ -12,7 +12,9 @@ import java.net.InetSocketAddress; import java.util.Collection; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; @@ -58,7 +60,7 @@ public class PVAClient implements AutoCloseable private final ConcurrentHashMap channels_by_id = new ConcurrentHashMap<>(); /** TCP handlers by server address */ - private final ConcurrentHashMap tcp_handlers = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> tcp_handlers = new ConcurrentHashMap<>(); private final AtomicInteger request_ids = new AtomicInteger(); @@ -89,20 +91,24 @@ public PVAClient() throws Exception // TCP traffic is handled by one ClientTCPHandler per address (IP, socket). // Pass helper to channel search for getting such a handler. - final BiFunction tcp_provider = (the_addr, use_tls) -> + final BiFunction> tcp_provider = (the_addr, use_tls) -> tcp_handlers.computeIfAbsent(the_addr, addr -> { - try - { - // If absent, create with initial empty GUID - return new ClientTCPHandler(this, addr, Guid.EMPTY, use_tls); - } - catch (Exception ex) + // If absent, create with initial empty GUID + final CompletableFuture create_tcp = new CompletableFuture<>(); + create_tcp.completeAsync(() -> { - logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex); - } - return null; - + try + { + return new ClientTCPHandler(this, addr, Guid.EMPTY, use_tls); + } + catch (Exception ex) + { + logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex); + } + return null; + }); + return create_tcp; }); search = new ChannelSearch(udp, udp_search_addresses, tcp_provider, name_server_addresses); @@ -254,18 +260,33 @@ void handleSearchResponse(final int channel_id, final InetSocketAddress server, // TODO Lightweight thread? Thread pool? final Thread setup_tcp = new Thread(() -> { - final ClientTCPHandler tcp = tcp_handlers.computeIfAbsent(server, addr -> + final Future tcp_future = tcp_handlers.computeIfAbsent(server, addr -> { - try + final CompletableFuture create_tcp = new CompletableFuture<>(); + create_tcp.completeAsync(() -> { - return new ClientTCPHandler(this, addr, guid, tls); - } - catch (Exception ex) - { - logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex); - } - return null; + try + { + return new ClientTCPHandler(this, addr, guid, tls); + } + catch (Exception ex) + { + logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex); + } + return null; + }); + return create_tcp; }); + ClientTCPHandler tcp; + try + { + tcp = tcp_future.get(); + } + catch (Exception ex) + { + logger.log(Level.WARNING, "Cannot connect to " + server, ex); + tcp = null; + } // In case of connection errors, tcp will be null if (tcp == null) { // Cannot connect to server on provided port? Likely a server or firewall problem. @@ -277,7 +298,6 @@ void handleSearchResponse(final int channel_id, final InetSocketAddress server, { if (tcp.updateGuid(guid)) logger.log(Level.FINE, "Search-only TCP handler received GUID, now " + tcp); - channel.registerWithServer(tcp); } }); @@ -296,7 +316,18 @@ void handleSearchResponse(final int channel_id, final InetSocketAddress server, void shutdownConnection(final ClientTCPHandler tcp) { // Forget this connection - final ClientTCPHandler removed = tcp_handlers.remove(tcp.getRemoteAddress()); + final Future tcp_future = tcp_handlers.remove(tcp.getRemoteAddress()); + final ClientTCPHandler removed; + try + { + removed = tcp_future == null ? null : tcp_future.get(); + } + catch (Exception ex) + { + logger.log(Level.WARNING, "Cannot obtain TCP client to close for " + tcp, ex); + return; + } + if (removed != tcp) logger.log(Level.WARNING, "Closed unknown " + tcp, new Exception("Call stack")); @@ -360,8 +391,15 @@ public void close() } // Stop TCP and UDP threads - for (ClientTCPHandler handler : tcp_handlers.values()) - handler.close(true); + for (Future handler : tcp_handlers.values()) + try + { + handler.get().close(true); + } + catch (Exception ex) + { + logger.log(Level.WARNING, "PVA Client error getting channel to close", ex); + } udp.close(); } From c9d607b5ad3b96ab2c77397f76be9ff510fe2ddd Mon Sep 17 00:00:00 2001 From: kasemir Date: Thu, 3 Apr 2025 07:49:04 -0400 Subject: [PATCH 05/11] PVA client uses virtual threads for TCP connection --- .../pva/src/main/java/org/epics/pva/client/PVAClient.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/client/PVAClient.java b/core/pva/src/main/java/org/epics/pva/client/PVAClient.java index fda5ebee6d..ff6f530280 100644 --- a/core/pva/src/main/java/org/epics/pva/client/PVAClient.java +++ b/core/pva/src/main/java/org/epics/pva/client/PVAClient.java @@ -257,8 +257,9 @@ void handleSearchResponse(final int channel_id, final InetSocketAddress server, logger.log(Level.FINE, () -> "Reply for " + channel + " from " + (tls ? "TLS " : "TCP ") + server + " " + guid); // TCP connection can be slow, especially when blocked by firewall, so move to thread - // TODO Lightweight thread? Thread pool? - final Thread setup_tcp = new Thread(() -> + Thread.ofVirtual() + .name("TCP connect " + server) + .start(() -> { final Future tcp_future = tcp_handlers.computeIfAbsent(server, addr -> { @@ -301,9 +302,6 @@ void handleSearchResponse(final int channel_id, final InetSocketAddress server, channel.registerWithServer(tcp); } }); - setup_tcp.setName("TCP connect " + server); - setup_tcp.setDaemon(true); - setup_tcp.start(); } /** Called by {@link ClientTCPHandler} when connection is lost or closed because unused From f98255147017141062dc08eccf5a0d7ab2c013ea Mon Sep 17 00:00:00 2001 From: Abraham Wolk Date: Fri, 4 Apr 2025 08:14:00 +0200 Subject: [PATCH 06/11] CSSTUDIO-3113 Create a virtual thread only for the TCP connection attempt to the IOC. --- .../java/org/epics/pva/client/PVAClient.java | 80 +++++++++---------- 1 file changed, 37 insertions(+), 43 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/client/PVAClient.java b/core/pva/src/main/java/org/epics/pva/client/PVAClient.java index ff6f530280..41e7548dee 100644 --- a/core/pva/src/main/java/org/epics/pva/client/PVAClient.java +++ b/core/pva/src/main/java/org/epics/pva/client/PVAClient.java @@ -256,52 +256,46 @@ void handleSearchResponse(final int channel_id, final InetSocketAddress server, channel.setState(ClientChannelState.FOUND); logger.log(Level.FINE, () -> "Reply for " + channel + " from " + (tls ? "TLS " : "TCP ") + server + " " + guid); - // TCP connection can be slow, especially when blocked by firewall, so move to thread - Thread.ofVirtual() - .name("TCP connect " + server) - .start(() -> + final Future tcp_future = tcp_handlers.computeIfAbsent(server, addr -> { - final Future tcp_future = tcp_handlers.computeIfAbsent(server, addr -> - { - final CompletableFuture create_tcp = new CompletableFuture<>(); - create_tcp.completeAsync(() -> - { - try - { - return new ClientTCPHandler(this, addr, guid, tls); - } - catch (Exception ex) + final CompletableFuture new_tcp_future = new CompletableFuture<>(); + + // Trying to establish a TCP connection is blocking and can be slow, + // especially when blocked by firewall. Therefore, attempt the TCP + // connection on a separate virtual thread: + Thread.ofVirtual().name("TCP connect " + server) + .start(() -> { - logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex); - } - return null; - }); - return create_tcp; - }); - ClientTCPHandler tcp; - try - { - tcp = tcp_future.get(); - } - catch (Exception ex) - { - logger.log(Level.WARNING, "Cannot connect to " + server, ex); - tcp = null; - } - // In case of connection errors, tcp will be null - if (tcp == null) - { // Cannot connect to server on provided port? Likely a server or firewall problem. - // On the next search, that same server might reply and then we fail the same way on connect. - // Still, no way around re-registering the search so we succeed once the server is fixed. - search.register(channel, false /* not "now" but eventually */); - } - else - { - if (tcp.updateGuid(guid)) - logger.log(Level.FINE, "Search-only TCP handler received GUID, now " + tcp); - channel.registerWithServer(tcp); - } + try { + var client_tcp_handler = new ClientTCPHandler(this, addr, guid, tls); + new_tcp_future.complete(client_tcp_handler); + } + catch (Exception ex) + { + logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex); + } + new_tcp_future.complete(null); + }); + + return new_tcp_future; }); + ClientTCPHandler tcp; + try { + tcp = tcp_future.get(); + } catch (Exception ex) { + logger.log(Level.WARNING, "Cannot connect to " + server, ex); + tcp = null; + } + // In case of connection errors, tcp will be null + if (tcp == null) { // Cannot connect to server on provided port? Likely a server or firewall problem. + // On the next search, that same server might reply and then we fail the same way on connect. + // Still, no way around re-registering the search so we succeed once the server is fixed. + search.register(channel, false /* not "now" but eventually */); + } else { + if (tcp.updateGuid(guid)) + logger.log(Level.FINE, "Search-only TCP handler received GUID, now " + tcp); + channel.registerWithServer(tcp); + } } /** Called by {@link ClientTCPHandler} when connection is lost or closed because unused From 97dc5f792c21f31209e5e90083e3c3e48d318a68 Mon Sep 17 00:00:00 2001 From: Abraham Wolk Date: Fri, 4 Apr 2025 08:43:04 +0200 Subject: [PATCH 07/11] CSSTUDIO-3113 Bugfix: Create outer thread also to prevent blocking when calling tcp_future.get(). --- .../java/org/epics/pva/client/PVAClient.java | 77 ++++++++++--------- 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/client/PVAClient.java b/core/pva/src/main/java/org/epics/pva/client/PVAClient.java index 41e7548dee..448e98ab01 100644 --- a/core/pva/src/main/java/org/epics/pva/client/PVAClient.java +++ b/core/pva/src/main/java/org/epics/pva/client/PVAClient.java @@ -256,46 +256,47 @@ void handleSearchResponse(final int channel_id, final InetSocketAddress server, channel.setState(ClientChannelState.FOUND); logger.log(Level.FINE, () -> "Reply for " + channel + " from " + (tls ? "TLS " : "TCP ") + server + " " + guid); - final Future tcp_future = tcp_handlers.computeIfAbsent(server, addr -> - { - final CompletableFuture new_tcp_future = new CompletableFuture<>(); - - // Trying to establish a TCP connection is blocking and can be slow, - // especially when blocked by firewall. Therefore, attempt the TCP - // connection on a separate virtual thread: - Thread.ofVirtual().name("TCP connect " + server) - .start(() -> + Thread.ofVirtual().name("Get TCP connection to " + server) + .start(() -> { + final Future tcp_future = tcp_handlers.computeIfAbsent(server, addr -> { - try { - var client_tcp_handler = new ClientTCPHandler(this, addr, guid, tls); - new_tcp_future.complete(client_tcp_handler); - } - catch (Exception ex) - { - logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex); - } - new_tcp_future.complete(null); + final CompletableFuture new_tcp_future = new CompletableFuture<>(); + + // Trying to establish a TCP connection is blocking and can be slow, + // especially when blocked by firewall. Therefore, attempt the TCP + // connection on a separate virtual thread: + Thread.ofVirtual().name("Establish TCP connection to " + server) + .start(() -> + { + try { + var client_tcp_handler = new ClientTCPHandler(this, addr, guid, tls); + new_tcp_future.complete(client_tcp_handler); + } catch (Exception ex) { + logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex); + } + new_tcp_future.complete(null); + }); + + return new_tcp_future; }); - - return new_tcp_future; - }); - ClientTCPHandler tcp; - try { - tcp = tcp_future.get(); - } catch (Exception ex) { - logger.log(Level.WARNING, "Cannot connect to " + server, ex); - tcp = null; - } - // In case of connection errors, tcp will be null - if (tcp == null) { // Cannot connect to server on provided port? Likely a server or firewall problem. - // On the next search, that same server might reply and then we fail the same way on connect. - // Still, no way around re-registering the search so we succeed once the server is fixed. - search.register(channel, false /* not "now" but eventually */); - } else { - if (tcp.updateGuid(guid)) - logger.log(Level.FINE, "Search-only TCP handler received GUID, now " + tcp); - channel.registerWithServer(tcp); - } + ClientTCPHandler tcp; + try { + tcp = tcp_future.get(); + } catch (Exception ex) { + logger.log(Level.WARNING, "Cannot connect to " + server, ex); + tcp = null; + } + // In case of connection errors, tcp will be null + if (tcp == null) { // Cannot connect to server on provided port? Likely a server or firewall problem. + // On the next search, that same server might reply and then we fail the same way on connect. + // Still, no way around re-registering the search so we succeed once the server is fixed. + search.register(channel, false /* not "now" but eventually */); + } else { + if (tcp.updateGuid(guid)) + logger.log(Level.FINE, "Search-only TCP handler received GUID, now " + tcp); + channel.registerWithServer(tcp); + } + }); } /** Called by {@link ClientTCPHandler} when connection is lost or closed because unused From 31ccb76cb41afb507168e0a8bfbb8fa14f8ea924 Mon Sep 17 00:00:00 2001 From: Abraham Wolk Date: Tue, 8 Apr 2025 09:43:24 +0200 Subject: [PATCH 08/11] CSSTUDIO-3113 Bugfix: Fix concurrency issue by adding "synchronized" to methods. --- .../main/java/org/epics/pva/client/ChannelSearch.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java index 5366e0d4fe..36ca064761 100644 --- a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java +++ b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java @@ -260,7 +260,7 @@ public void start() /** @param channel Channel that should be searched * @param now Start searching as soon as possible, or delay? */ - public void register(final PVAChannel channel, final boolean now) + public synchronized void register(final PVAChannel channel, final boolean now) { logger.log(Level.FINE, () -> "Register search for " + channel + (now ? " now" : " soon")); @@ -286,7 +286,7 @@ public void register(final PVAChannel channel, final boolean now) * @param channel_id * @return {@link PVAChannel}, null when channel wasn't searched any more */ - public PVAChannel unregister(final int channel_id) + public synchronized PVAChannel unregister(final int channel_id) { final SearchedChannel searched = searched_channels.remove(channel_id); if (searched != null) @@ -306,7 +306,7 @@ public PVAChannel unregister(final int channel_id) * *

Resets their search counter so they're searched "real soon". */ - public void boost() + public synchronized void boost() { for (SearchedChannel searched : searched_channels.values()) { @@ -337,7 +337,7 @@ public void boost() /** Invoked by timer: Check searched channels for the next one to handle */ @SuppressWarnings("unchecked") - private void runSearches() + private synchronized void runSearches() { to_search.clear(); synchronized (search_buckets) @@ -548,7 +548,7 @@ private void sendSearch(final int seq, final Collection c } /** Stop searching channels */ - public void close() + public synchronized void close() { searched_channels.clear(); From 971903c396619246683b9f43adf3faedcfd59e15 Mon Sep 17 00:00:00 2001 From: Abraham Wolk Date: Tue, 8 Apr 2025 10:42:58 +0200 Subject: [PATCH 09/11] CSSTUDIO-3113 Rely on methods being 'synchronized' instead of synchronizing on 'search_buckets'. --- .../org/epics/pva/client/ChannelSearch.java | 109 ++++++++---------- 1 file changed, 47 insertions(+), 62 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java index 36ca064761..2016ad78dc 100644 --- a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java +++ b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java @@ -154,13 +154,13 @@ public boolean equals(Object obj) * which would result in an endless loop. * *

Access to either {@link #search_buckets} or {@link #current_search_bucket} - * must SYNC on {@link #search_buckets}. + * must only occur in a 'synchronized' method. */ private final ArrayList> search_buckets = new ArrayList<>(MAX_SEARCH_PERIOD+2); /** Index of current search bucket, i.e. the one about to be searched. * - *

Access must SYNC on {@link #search_buckets}. + *

Access must only occur in a 'synchronized' method. */ private final AtomicInteger current_search_bucket = new AtomicInteger(); @@ -203,11 +203,9 @@ public ChannelSearch(final ClientUDPHandler udp, this.udp = udp; this.tcp_provider = tcp_provider; - synchronized (search_buckets) - { - for (int i=0; i()); - } + + for (int i = 0; i < MAX_SEARCH_PERIOD + 2; ++i) + search_buckets.add(new HashSet<>()); // Searches sent to multicast (IPv4, IPv6) or broadcast addresses (IPv4) reach every PVA server // on that multicast group or bcast subnet. @@ -270,13 +268,11 @@ public synchronized void register(final PVAChannel channel, final boolean now) final SearchedChannel sc = searched_channels.computeIfAbsent(channel.getCID(), id -> new SearchedChannel(channel)); - synchronized (search_buckets) - { - int bucket = current_search_bucket.get(); - if (!now) - bucket = (bucket + SEARCH_SOON_DELAY) % search_buckets.size(); - search_buckets.get(bucket).add(sc); - } + int bucket = current_search_bucket.get(); + if (!now) + bucket = (bucket + SEARCH_SOON_DELAY) % search_buckets.size(); + search_buckets.get(bucket).add(sc); + // Jumpstart search instead of waiting up to ~1 second for current bucket to be handled if (now) timer.execute(this::runSearches); @@ -292,11 +288,8 @@ public synchronized PVAChannel unregister(final int channel_id) if (searched != null) { logger.log(Level.FINE, () -> "Unregister search for " + searched.channel.getName() + " " + channel_id); - synchronized (search_buckets) - { - for (Set bucket : search_buckets) - bucket.remove(searched); - } + for (Set bucket : search_buckets) + bucket.remove(searched); return searched.channel; } return null; @@ -317,11 +310,9 @@ public synchronized void boost() if (period == MIN_SEARCH_PERIOD) { logger.log(Level.FINE, () -> "Restart search for '" + searched.channel.getName() + "'"); - synchronized (search_buckets) - { - final Set bucket = search_buckets.get(current_search_bucket.get()); - bucket.add(searched); - } + + final Set bucket = search_buckets.get(current_search_bucket.get()); + bucket.add(searched); } // Not sending search right now: // search(channel); @@ -340,47 +331,41 @@ public synchronized void boost() private synchronized void runSearches() { to_search.clear(); - synchronized (search_buckets) - { - // Determine current search bucket - final int current = current_search_bucket.getAndUpdate(i -> (i + 1) % search_buckets.size()); - final Set bucket = search_buckets.get(current); - logger.log(Level.FINEST, () -> "Search bucket " + current); - // Remove searched channels from the current bucket - for (SearchedChannel sc : bucket) - { - if (sc.channel.getState() == ClientChannelState.SEARCHING && - searched_channels.containsKey(sc.channel.getCID())) - { - // Collect channels in 'to_search' for handling outside of sync. section - to_search.add(sc.channel); - - // Determine next search period - final int period = sc.search_period.updateAndGet(sec -> sec < MAX_SEARCH_PERIOD - ? sec + 1 - : MAX_SEARCH_PERIOD); - - // Add to corresponding search bucket, or delay by one second - // in case that search bucket is quite full - final int i_n = (current + period) % search_buckets.size(); - final int i_n_n = (i_n + 1) % search_buckets.size(); - final Set next = search_buckets.get(i_n); - final Set next_next = search_buckets.get(i_n_n); - if (i_n == current || i_n_n == current) - throw new IllegalStateException("Current, next and nextnext search indices for " + sc.channel + " are " + - current + ", " + i_n + ", " + i_n_n); - if (next_next.size() < next.size()) - next_next.add(sc); - else - next.add(sc); - } + // Determine current search bucket + final int current = current_search_bucket.getAndUpdate(i -> (i + 1) % search_buckets.size()); + final Set bucket = search_buckets.get(current); + logger.log(Level.FINEST, () -> "Search bucket " + current); + + // Remove searched channels from the current bucket + for (SearchedChannel sc : bucket) { + if (sc.channel.getState() == ClientChannelState.SEARCHING && + searched_channels.containsKey(sc.channel.getCID())) { + // Collect channels in 'to_search' for handling outside of sync. section + to_search.add(sc.channel); + + // Determine next search period + final int period = sc.search_period.updateAndGet(sec -> sec < MAX_SEARCH_PERIOD + ? sec + 1 + : MAX_SEARCH_PERIOD); + + // Add to corresponding search bucket, or delay by one second + // in case that search bucket is quite full + final int i_n = (current + period) % search_buckets.size(); + final int i_n_n = (i_n + 1) % search_buckets.size(); + final Set next = search_buckets.get(i_n); + final Set next_next = search_buckets.get(i_n_n); + if (i_n == current || i_n_n == current) + throw new IllegalStateException("Current, next and nextnext search indices for " + sc.channel + " are " + + current + ", " + i_n + ", " + i_n_n); + if (next_next.size() < next.size()) + next_next.add(sc); else - logger.log(Level.FINE, "Dropping channel from search: " + sc.channel); - } - bucket.clear(); + next.add(sc); + } else + logger.log(Level.FINE, "Dropping channel from search: " + sc.channel); } - + bucket.clear(); // Search batch.. // Size of a search request is close to 50 bytes From bcbc14f7d70c736370bee5b3a2e17f644446649c Mon Sep 17 00:00:00 2001 From: Abraham Wolk Date: Tue, 8 Apr 2025 10:50:47 +0200 Subject: [PATCH 10/11] CSSTUDIO-3113 Change the type of 'searched_channels' from ConcurrentHashMap<> to HashMap<>. --- .../pva/src/main/java/org/epics/pva/client/ChannelSearch.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java index 2016ad78dc..2def8dd82a 100644 --- a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java +++ b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java @@ -13,11 +13,11 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Random; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -133,7 +133,7 @@ public boolean equals(Object obj) // up to MAX_SEARCH_PERIOD. /** Map of searched channels by channel ID */ - private ConcurrentHashMap searched_channels = new ConcurrentHashMap<>(); + private HashMap searched_channels = new HashMap<>(); /** Search buckets * From 5df73f66a6ddd915fa08d49e62dcfd1e64cfe70e Mon Sep 17 00:00:00 2001 From: Abraham Wolk Date: Tue, 8 Apr 2025 11:21:22 +0200 Subject: [PATCH 11/11] CSSTUDIO-3113 Call search.register() on the thread that tries to establish the TCP connection. --- .../main/java/org/epics/pva/client/PVAClient.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/client/PVAClient.java b/core/pva/src/main/java/org/epics/pva/client/PVAClient.java index 448e98ab01..3f349b708f 100644 --- a/core/pva/src/main/java/org/epics/pva/client/PVAClient.java +++ b/core/pva/src/main/java/org/epics/pva/client/PVAClient.java @@ -273,8 +273,12 @@ void handleSearchResponse(final int channel_id, final InetSocketAddress server, new_tcp_future.complete(client_tcp_handler); } catch (Exception ex) { logger.log(Level.WARNING, "Cannot connect to TCP " + addr, ex); + // Cannot connect to server on provided port? Likely a server or firewall problem. + // On the next search, that same server might reply and then we fail the same way on connect. + // Still, no way around re-registering the search so we succeed once the server is fixed. + search.register(channel, false /* not "now" but eventually */); + new_tcp_future.complete(null); } - new_tcp_future.complete(null); }); return new_tcp_future; @@ -287,11 +291,7 @@ void handleSearchResponse(final int channel_id, final InetSocketAddress server, tcp = null; } // In case of connection errors, tcp will be null - if (tcp == null) { // Cannot connect to server on provided port? Likely a server or firewall problem. - // On the next search, that same server might reply and then we fail the same way on connect. - // Still, no way around re-registering the search so we succeed once the server is fixed. - search.register(channel, false /* not "now" but eventually */); - } else { + if (tcp != null) { if (tcp.updateGuid(guid)) logger.log(Level.FINE, "Search-only TCP handler received GUID, now " + tcp); channel.registerWithServer(tcp);