Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 69 additions & 52 deletions core/pva/src/main/java/org/epics/pva/client/ChannelSearch.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2019-2023 Oak Ridge National Laboratory.
* Copyright (c) 2019-2025 Oak Ridge National Laboratory.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
Expand All @@ -13,10 +13,11 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -99,27 +100,47 @@ private class SearchedChannel
// Otherwise run risk of getting reply without being able
// to handle it
}

// Hash by channel name
@Override
public int hashCode()
{
return channel.getName().hashCode();
}

// Compare by channel name
@Override
public boolean equals(Object obj)
{
if (obj instanceof SearchedChannel other)
return other.channel.getName().equals(channel.getName());
return false;
}
}

// SearchedChannels are tracked in two data structures
//
// - searched_channels (concurrent)
// - searched_channels
// Fast lookup of channel by ID,
// efficient `computeIfAbsent(cid, ..` mechanism for creating
// at most one SearchedChannel per CID.
// creating at most one SearchedChannel per CID.
// Allows checking if a channel is indeed searched,
// and locating the channel for a search reply.
//
// - search_buckets (need to SYNC)
// - search_buckets
// Efficiently schedule the search messages for all channels
// up to MAX_SEARCH_PERIOD.
//
// Access to either one needs to be synchronized

/** Map of searched channels by channel ID */
private ConcurrentHashMap<Integer, SearchedChannel> searched_channels = new ConcurrentHashMap<>();
/** Map of searched channels by channel ID
*
* Access only from synchronized method
*/
private HashMap<Integer, SearchedChannel> searched_channels = new HashMap<>();

/** Search buckets
*
* <p>The {@link #current_search_bucket} selects the list
* <p>The {@link #current_search_bucket} selects the set
* of channels to be searched by {@link #runSearches()},
* which runs roughly once per second, each time moving to
* the next search bucket in a ring buffer fashion.
Expand All @@ -136,13 +157,13 @@ private class SearchedChannel
* which would result in an endless loop.
*
* <p>Access to either {@link #search_buckets} or {@link #current_search_bucket}
* must SYNC on {@link #search_buckets}.
* must only occur in a 'synchronized' method.
*/
private final ArrayList<LinkedList<SearchedChannel>> search_buckets = new ArrayList<>();
private final ArrayList<Set<SearchedChannel>> search_buckets = new ArrayList<>(MAX_SEARCH_PERIOD+2);

/** Index of current search bucket, i.e. the one about to be searched.
*
* <p>Access must SYNC on {@link #search_buckets}.
* <p>Access must only occur in a 'synchronized' method.
*/
private final AtomicInteger current_search_bucket = new AtomicInteger();

Expand Down Expand Up @@ -185,11 +206,9 @@ public ChannelSearch(final ClientUDPHandler udp,
this.udp = udp;
this.tcp_provider = tcp_provider;

synchronized (search_buckets)
{
for (int i=0; i<MAX_SEARCH_PERIOD+2; ++i)
search_buckets.add(new LinkedList<>());
}
// Each bucket holds set of channels to search in that time slot
for (int i=0; i<MAX_SEARCH_PERIOD+2; ++i)
search_buckets.add(new HashSet<>());

// Searches sent to multicast (IPv4, IPv6) or broadcast addresses (IPv4) reach every PVA server
// on that multicast group or bcast subnet.
Expand Down Expand Up @@ -227,7 +246,7 @@ else if (addr.isBroadcast())

public void start()
{
// +-jitter to prevent multiple clients from sending concurrent search requests
// 1 second +-jitter to prevent multiple clients from sending concurrent search requests
final long period = SEARCH_PERIOD_MS + (new Random().nextInt(2*SEARCH_JITTER_MS+1) - SEARCH_JITTER_MS);

logger.log(Level.FINER,
Expand All @@ -246,19 +265,20 @@ public void register(final PVAChannel channel, final boolean now)
{
logger.log(Level.FINE, () -> "Register search for " + channel + (now ? " now" : " soon"));

final ClientChannelState old = channel.setState(ClientChannelState.SEARCHING);
if (old == ClientChannelState.SEARCHING)
logger.log(Level.WARNING, "Registering channel " + channel + " to be searched more than once ");
synchronized (this)
{
final ClientChannelState old = channel.setState(ClientChannelState.SEARCHING);
if (old == ClientChannelState.SEARCHING)
logger.log(Level.WARNING, "Registering channel " + channel + " to be searched more than once ");

final SearchedChannel sc = searched_channels.computeIfAbsent(channel.getCID(), id -> new SearchedChannel(channel));
final SearchedChannel sc = searched_channels.computeIfAbsent(channel.getCID(), id -> new SearchedChannel(channel));

synchronized (search_buckets)
{
int bucket = current_search_bucket.get();
if (!now)
bucket = (bucket + SEARCH_SOON_DELAY) % search_buckets.size();
bucket = (bucket + SEARCH_SOON_DELAY) % search_buckets.size();
search_buckets.get(bucket).add(sc);
}

// Jumpstart search instead of waiting up to ~1 second for current bucket to be handled
if (now)
timer.execute(this::runSearches);
Expand All @@ -268,17 +288,15 @@ public void register(final PVAChannel channel, final boolean now)
* @param channel_id
* @return {@link PVAChannel}, <code>null</code> when channel wasn't searched any more
*/
public PVAChannel unregister(final int channel_id)
public synchronized PVAChannel unregister(final int channel_id)
{
final SearchedChannel searched = searched_channels.remove(channel_id);
if (searched != null)
{
logger.log(Level.FINE, () -> "Unregister search for " + searched.channel.getName() + " " + channel_id);
// NOT removing `searched` from all `search_buckets`.
// Removal would be a slow, linear operation.
// `runSearches()` will drop the channel from `search_buckets`
// because it's no longer listed in `searched_channels`

// Remove `searched` from all `search_buckets`.
for (Set<SearchedChannel> bucket : search_buckets)
bucket.remove(searched);
return searched.channel;
}
return null;
Expand All @@ -288,7 +306,7 @@ public PVAChannel unregister(final int channel_id)
*
* <p>Resets their search counter so they're searched "real soon".
*/
public void boost()
public synchronized void boost()
{
for (SearchedChannel searched : searched_channels.values())
{
Expand All @@ -299,12 +317,9 @@ public void boost()
if (period == MIN_SEARCH_PERIOD)
{
logger.log(Level.FINE, () -> "Restart search for '" + searched.channel.getName() + "'");
synchronized (search_buckets)
{
final LinkedList<SearchedChannel> bucket = search_buckets.get(current_search_bucket.get());
if (! bucket.contains(searched))
bucket.add(searched);
}

final Set<SearchedChannel> bucket = search_buckets.get(current_search_bucket.get());
bucket.add(searched);
}
// Not sending search right now:
// search(channel);
Expand All @@ -322,17 +337,17 @@ public void boost()
@SuppressWarnings("unchecked")
private void runSearches()
{
// Determine current search bucket
final int current = current_search_bucket.getAndUpdate(i -> (i + 1) % search_buckets.size());
// Collect channels to be searched while sync'ed
to_search.clear();
synchronized (search_buckets)
synchronized (this)
{
// Determine current search bucket
final int current = current_search_bucket.getAndUpdate(i -> (i + 1) % search_buckets.size());
final LinkedList<SearchedChannel> bucket = search_buckets.get(current);
final Set<SearchedChannel> bucket = search_buckets.get(current);
logger.log(Level.FINEST, () -> "Search bucket " + current);

// Remove searched channels from the current bucket
SearchedChannel sc;
while ((sc = bucket.poll()) != null)
for (SearchedChannel sc : bucket)
{
if (sc.channel.getState() == ClientChannelState.SEARCHING &&
searched_channels.containsKey(sc.channel.getCID()))
Expand All @@ -349,8 +364,8 @@ private void runSearches()
// in case that search bucket is quite full
final int i_n = (current + period) % search_buckets.size();
final int i_n_n = (i_n + 1) % search_buckets.size();
final LinkedList<SearchedChannel> next = search_buckets.get(i_n);
final LinkedList<SearchedChannel> next_next = search_buckets.get(i_n_n);
final Set<SearchedChannel> next = search_buckets.get(i_n);
final Set<SearchedChannel> next_next = search_buckets.get(i_n_n);
if (i_n == current || i_n_n == current)
throw new IllegalStateException("Current, next and nextnext search indices for " + sc.channel + " are " +
current + ", " + i_n + ", " + i_n_n);
Expand All @@ -362,9 +377,9 @@ private void runSearches()
else
logger.log(Level.FINE, "Dropping channel from search: " + sc.channel);
}
bucket.clear();
}


// Search batch..
// Size of a search request is close to 50 bytes
// plus { int cid, string name } for each channel.
Expand Down Expand Up @@ -439,8 +454,8 @@ private void search(final Collection<SearchRequest.Channel> channels)
// This is configured in EPICS_PVA_NAME_SERVERS via prefix pvas://
final ClientTCPHandler tcp = tcp_provider.apply(name_server.getAddress(), name_server.isTLS());

// In case of connection errors (TCP connection blocked by firewall),
// tcp will be null
// In older implementation, tcp was null in case of connection errors (TCP connection blocked by firewall).
// No longer expected to happen but check anyway
if (tcp != null)
{
final RequestEncoder search_request = (version, buffer) ->
Expand Down Expand Up @@ -526,8 +541,10 @@ private void sendSearch(final int seq, final Collection<SearchRequest.Channel> c
/** Stop searching channels */
public void close()
{
searched_channels.clear();

synchronized (this)
{
searched_channels.clear();
}
timer.shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected boolean initializeSocket()
socket.setKeepAlive(true);
}
catch (Exception ex)
{
{
logger.log(Level.WARNING, "PVA client cannot connect to " + server_address, ex);
return false;
}
Expand All @@ -157,6 +157,13 @@ protected boolean initializeSocket()
return true;
}

@Override
public InetSocketAddress getRemoteAddress()
{
// socket may not be connected or null, return address to which we want to connect
return new InetSocketAddress(server_address.getAddress(), server_address.getPort());
}

/** @return Client context */
PVAClient getClient()
{
Expand Down
29 changes: 10 additions & 19 deletions core/pva/src/main/java/org/epics/pva/common/TCPHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ public TCPHandler(final boolean client_mode)
*/
abstract protected boolean initializeSocket();

/** @return Remote address of the TCP socket */
abstract public InetSocketAddress getRemoteAddress();

/** Start receiving data
* To be called by Client/ServerTCPHandler when fully constructed
*/
Expand All @@ -167,12 +170,6 @@ protected void startSender() throws Exception
throw new Exception("Send thread already running");
}

/** @return Remote address of this end of the TCP socket */
public InetSocketAddress getRemoteAddress()
{
return new InetSocketAddress(socket.getInetAddress(), socket.getPort());
}

/** @return Is the send queue idle/empty? */
protected boolean isSendQueueIdle()
{
Expand Down Expand Up @@ -269,20 +266,14 @@ protected void send(final ByteBuffer buffer) throws Exception
/** Receiver */
private Void receiver()
{
// Establish connection
Thread.currentThread().setName("TCP receiver");
while (! initializeSocket())
try
{ // Delay for (another) connection timeout, at least 1 sec
Thread.sleep(Math.max(1, PVASettings.EPICS_PVA_TCP_SOCKET_TMO) * 1000);
}
catch (Exception ignore)
{
// NOP
}
// Listen on the connection
try
{
// Establish connection
Thread.currentThread().setName("TCP receiver");
if (! initializeSocket())
return null;

// Listen on the connection
Thread.currentThread().setName("TCP receiver " + socket.getLocalSocketAddress());
logger.log(Level.FINER, () -> Thread.currentThread().getName() + " started for " + socket.getRemoteSocketAddress());
logger.log(Level.FINER, "Native byte order " + receive_buffer.order());
Expand Down Expand Up @@ -348,8 +339,8 @@ private Void receiver()
}
finally
{
onReceiverExited(running);
logger.log(Level.FINER, Thread.currentThread().getName() + " done.");
onReceiverExited(running);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ protected boolean initializeSocket()
return true;
}

@Override
public InetSocketAddress getRemoteAddress()
{
return new InetSocketAddress(socket.getInetAddress(), socket.getPort());
}

PVAServer getServer()
{
return server;
Expand Down