From 859a1580d25e23acea8c2f9d7a936d0df7b4b84a Mon Sep 17 00:00:00 2001 From: kasemir Date: Tue, 10 Jun 2025 11:13:46 -0400 Subject: [PATCH 1/2] PVA: Search cleanup .. and if TCP handler can't connect, quit so that client will then re-start the search --- .../org/epics/pva/client/ChannelSearch.java | 120 ++++++++++-------- .../epics/pva/client/ClientTCPHandler.java | 9 +- .../java/org/epics/pva/common/TCPHandler.java | 29 ++--- .../epics/pva/server/ServerTCPHandler.java | 6 + 4 files changed, 93 insertions(+), 71 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java index 6d8c89415f..142bcaf3a2 100644 --- a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java +++ b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2019-2023 Oak Ridge National Laboratory. + * Copyright (c) 2019-2025 Oak Ridge National Laboratory. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at @@ -13,10 +13,11 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -99,27 +100,47 @@ private class SearchedChannel // Otherwise run risk of getting reply without being able // to handle it } + + // Hash by channel name + @Override + public int hashCode() + { + return channel.getName().hashCode(); + } + + // Compare by channel name + @Override + public boolean equals(Object obj) + { + if (obj instanceof SearchedChannel other) + return other.channel.getName().equals(channel.getName()); + return false; + } } // SearchedChannels are tracked in two data structures // - // - searched_channels (concurrent) + // - searched_channels // Fast lookup of channel by ID, - // efficient `computeIfAbsent(cid, ..` mechanism for creating - // at most one SearchedChannel per CID. + // creating at most one SearchedChannel per CID. // Allows checking if a channel is indeed searched, // and locating the channel for a search reply. // - // - search_buckets (need to SYNC) + // - search_buckets // Efficiently schedule the search messages for all channels // up to MAX_SEARCH_PERIOD. + // + // Access to either one needs to be synchronized - /** Map of searched channels by channel ID */ - private ConcurrentHashMap searched_channels = new ConcurrentHashMap<>(); + /** Map of searched channels by channel ID + * + * Access only from synchronized method + */ + private HashMap searched_channels = new HashMap<>(); /** Search buckets * - *

The {@link #current_search_bucket} selects the list + *

The {@link #current_search_bucket} selects the set * of channels to be searched by {@link #runSearches()}, * which runs roughly once per second, each time moving to * the next search bucket in a ring buffer fashion. @@ -136,13 +157,13 @@ private class SearchedChannel * which would result in an endless loop. * *

Access to either {@link #search_buckets} or {@link #current_search_bucket} - * must SYNC on {@link #search_buckets}. + * must only occur in a 'synchronized' method. */ - private final ArrayList> search_buckets = new ArrayList<>(); + private final ArrayList> search_buckets = new ArrayList<>(MAX_SEARCH_PERIOD+2); /** Index of current search bucket, i.e. the one about to be searched. * - *

Access must SYNC on {@link #search_buckets}. + *

Access must only occur in a 'synchronized' method. */ private final AtomicInteger current_search_bucket = new AtomicInteger(); @@ -185,11 +206,9 @@ public ChannelSearch(final ClientUDPHandler udp, this.udp = udp; this.tcp_provider = tcp_provider; - synchronized (search_buckets) - { - for (int i=0; i()); - } + // Each bucket holds set of channels to search in that time slot + for (int i=0; i()); // Searches sent to multicast (IPv4, IPv6) or broadcast addresses (IPv4) reach every PVA server // on that multicast group or bcast subnet. @@ -227,7 +246,7 @@ else if (addr.isBroadcast()) public void start() { - // +-jitter to prevent multiple clients from sending concurrent search requests + // 1 second +-jitter to prevent multiple clients from sending concurrent search requests final long period = SEARCH_PERIOD_MS + (new Random().nextInt(2*SEARCH_JITTER_MS+1) - SEARCH_JITTER_MS); logger.log(Level.FINER, @@ -246,19 +265,20 @@ public void register(final PVAChannel channel, final boolean now) { logger.log(Level.FINE, () -> "Register search for " + channel + (now ? " now" : " soon")); - final ClientChannelState old = channel.setState(ClientChannelState.SEARCHING); - if (old == ClientChannelState.SEARCHING) - logger.log(Level.WARNING, "Registering channel " + channel + " to be searched more than once "); + synchronized (this) + { + final ClientChannelState old = channel.setState(ClientChannelState.SEARCHING); + if (old == ClientChannelState.SEARCHING) + logger.log(Level.WARNING, "Registering channel " + channel + " to be searched more than once "); - final SearchedChannel sc = searched_channels.computeIfAbsent(channel.getCID(), id -> new SearchedChannel(channel)); + final SearchedChannel sc = searched_channels.computeIfAbsent(channel.getCID(), id -> new SearchedChannel(channel)); - synchronized (search_buckets) - { int bucket = current_search_bucket.get(); if (!now) - bucket = (bucket + SEARCH_SOON_DELAY) % search_buckets.size(); - search_buckets.get(bucket).add(sc); + bucket = (bucket + SEARCH_SOON_DELAY) % search_buckets.size(); + search_buckets.get(bucket).add(sc); } + // Jumpstart search instead of waiting up to ~1 second for current bucket to be handled if (now) timer.execute(this::runSearches); @@ -268,17 +288,15 @@ public void register(final PVAChannel channel, final boolean now) * @param channel_id * @return {@link PVAChannel}, null when channel wasn't searched any more */ - public PVAChannel unregister(final int channel_id) + public synchronized PVAChannel unregister(final int channel_id) { final SearchedChannel searched = searched_channels.remove(channel_id); if (searched != null) { logger.log(Level.FINE, () -> "Unregister search for " + searched.channel.getName() + " " + channel_id); - // NOT removing `searched` from all `search_buckets`. - // Removal would be a slow, linear operation. - // `runSearches()` will drop the channel from `search_buckets` - // because it's no longer listed in `searched_channels` - + // Remove `searched` from all `search_buckets`. + for (Set bucket : search_buckets) + bucket.remove(searched); return searched.channel; } return null; @@ -288,7 +306,7 @@ public PVAChannel unregister(final int channel_id) * *

Resets their search counter so they're searched "real soon". */ - public void boost() + public synchronized void boost() { for (SearchedChannel searched : searched_channels.values()) { @@ -299,12 +317,9 @@ public void boost() if (period == MIN_SEARCH_PERIOD) { logger.log(Level.FINE, () -> "Restart search for '" + searched.channel.getName() + "'"); - synchronized (search_buckets) - { - final LinkedList bucket = search_buckets.get(current_search_bucket.get()); - if (! bucket.contains(searched)) - bucket.add(searched); - } + + final Set bucket = search_buckets.get(current_search_bucket.get()); + bucket.add(searched); } // Not sending search right now: // search(channel); @@ -322,17 +337,17 @@ public void boost() @SuppressWarnings("unchecked") private void runSearches() { + // Determine current search bucket + final int current = current_search_bucket.getAndUpdate(i -> (i + 1) % search_buckets.size()); + // Collect channels to be searched while sync'ed to_search.clear(); - synchronized (search_buckets) + synchronized (this) { - // Determine current search bucket - final int current = current_search_bucket.getAndUpdate(i -> (i + 1) % search_buckets.size()); - final LinkedList bucket = search_buckets.get(current); + final Set bucket = search_buckets.get(current); logger.log(Level.FINEST, () -> "Search bucket " + current); // Remove searched channels from the current bucket - SearchedChannel sc; - while ((sc = bucket.poll()) != null) + for (SearchedChannel sc : bucket) { if (sc.channel.getState() == ClientChannelState.SEARCHING && searched_channels.containsKey(sc.channel.getCID())) @@ -349,8 +364,8 @@ private void runSearches() // in case that search bucket is quite full final int i_n = (current + period) % search_buckets.size(); final int i_n_n = (i_n + 1) % search_buckets.size(); - final LinkedList next = search_buckets.get(i_n); - final LinkedList next_next = search_buckets.get(i_n_n); + final Set next = search_buckets.get(i_n); + final Set next_next = search_buckets.get(i_n_n); if (i_n == current || i_n_n == current) throw new IllegalStateException("Current, next and nextnext search indices for " + sc.channel + " are " + current + ", " + i_n + ", " + i_n_n); @@ -362,9 +377,9 @@ private void runSearches() else logger.log(Level.FINE, "Dropping channel from search: " + sc.channel); } + bucket.clear(); } - // Search batch.. // Size of a search request is close to 50 bytes // plus { int cid, string name } for each channel. @@ -441,6 +456,7 @@ private void search(final Collection channels) // In case of connection errors (TCP connection blocked by firewall), // tcp will be null + // TODO CHECK THAT with updated ClientTCPHandler that connects on receive thread if (tcp != null) { final RequestEncoder search_request = (version, buffer) -> @@ -526,8 +542,10 @@ private void sendSearch(final int seq, final Collection c /** Stop searching channels */ public void close() { - searched_channels.clear(); - + synchronized (this) + { + searched_channels.clear(); + } timer.shutdown(); } } diff --git a/core/pva/src/main/java/org/epics/pva/client/ClientTCPHandler.java b/core/pva/src/main/java/org/epics/pva/client/ClientTCPHandler.java index dd3eb671c0..c8aa81e352 100644 --- a/core/pva/src/main/java/org/epics/pva/client/ClientTCPHandler.java +++ b/core/pva/src/main/java/org/epics/pva/client/ClientTCPHandler.java @@ -140,7 +140,7 @@ protected boolean initializeSocket() socket.setKeepAlive(true); } catch (Exception ex) - { + { logger.log(Level.WARNING, "PVA client cannot connect to " + server_address, ex); return false; } @@ -157,6 +157,13 @@ protected boolean initializeSocket() return true; } + @Override + public InetSocketAddress getRemoteAddress() + { + // socket may not be connected or null, return address to which we want to connect + return new InetSocketAddress(server_address.getAddress(), server_address.getPort()); + } + /** @return Client context */ PVAClient getClient() { diff --git a/core/pva/src/main/java/org/epics/pva/common/TCPHandler.java b/core/pva/src/main/java/org/epics/pva/common/TCPHandler.java index 265b97fb5c..7c7aab5bbf 100644 --- a/core/pva/src/main/java/org/epics/pva/common/TCPHandler.java +++ b/core/pva/src/main/java/org/epics/pva/common/TCPHandler.java @@ -144,6 +144,9 @@ public TCPHandler(final boolean client_mode) */ abstract protected boolean initializeSocket(); + /** @return Remote address of the TCP socket */ + abstract public InetSocketAddress getRemoteAddress(); + /** Start receiving data * To be called by Client/ServerTCPHandler when fully constructed */ @@ -167,12 +170,6 @@ protected void startSender() throws Exception throw new Exception("Send thread already running"); } - /** @return Remote address of this end of the TCP socket */ - public InetSocketAddress getRemoteAddress() - { - return new InetSocketAddress(socket.getInetAddress(), socket.getPort()); - } - /** @return Is the send queue idle/empty? */ protected boolean isSendQueueIdle() { @@ -269,20 +266,14 @@ protected void send(final ByteBuffer buffer) throws Exception /** Receiver */ private Void receiver() { - // Establish connection - Thread.currentThread().setName("TCP receiver"); - while (! initializeSocket()) - try - { // Delay for (another) connection timeout, at least 1 sec - Thread.sleep(Math.max(1, PVASettings.EPICS_PVA_TCP_SOCKET_TMO) * 1000); - } - catch (Exception ignore) - { - // NOP - } - // Listen on the connection try { + // Establish connection + Thread.currentThread().setName("TCP receiver"); + if (! initializeSocket()) + return null; + + // Listen on the connection Thread.currentThread().setName("TCP receiver " + socket.getLocalSocketAddress()); logger.log(Level.FINER, () -> Thread.currentThread().getName() + " started for " + socket.getRemoteSocketAddress()); logger.log(Level.FINER, "Native byte order " + receive_buffer.order()); @@ -348,8 +339,8 @@ private Void receiver() } finally { - onReceiverExited(running); logger.log(Level.FINER, Thread.currentThread().getName() + " done."); + onReceiverExited(running); } return null; } diff --git a/core/pva/src/main/java/org/epics/pva/server/ServerTCPHandler.java b/core/pva/src/main/java/org/epics/pva/server/ServerTCPHandler.java index ff294b9866..e56070105c 100644 --- a/core/pva/src/main/java/org/epics/pva/server/ServerTCPHandler.java +++ b/core/pva/src/main/java/org/epics/pva/server/ServerTCPHandler.java @@ -122,6 +122,12 @@ protected boolean initializeSocket() return true; } + @Override + public InetSocketAddress getRemoteAddress() + { + return new InetSocketAddress(socket.getInetAddress(), socket.getPort()); + } + PVAServer getServer() { return server; From 6026a81485983c3f21d919c6491a12127004342f Mon Sep 17 00:00:00 2001 From: kasemir Date: Tue, 10 Jun 2025 11:22:07 -0400 Subject: [PATCH 2/2] Explain remaining tcp null check --- .../src/main/java/org/epics/pva/client/ChannelSearch.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java index 142bcaf3a2..28ab70d8da 100644 --- a/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java +++ b/core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java @@ -276,7 +276,7 @@ public void register(final PVAChannel channel, final boolean now) int bucket = current_search_bucket.get(); if (!now) bucket = (bucket + SEARCH_SOON_DELAY) % search_buckets.size(); - search_buckets.get(bucket).add(sc); + search_buckets.get(bucket).add(sc); } // Jumpstart search instead of waiting up to ~1 second for current bucket to be handled @@ -454,9 +454,8 @@ private void search(final Collection channels) // This is configured in EPICS_PVA_NAME_SERVERS via prefix pvas:// final ClientTCPHandler tcp = tcp_provider.apply(name_server.getAddress(), name_server.isTLS()); - // In case of connection errors (TCP connection blocked by firewall), - // tcp will be null - // TODO CHECK THAT with updated ClientTCPHandler that connects on receive thread + // In older implementation, tcp was null in case of connection errors (TCP connection blocked by firewall). + // No longer expected to happen but check anyway if (tcp != null) { final RequestEncoder search_request = (version, buffer) ->