diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/error/ErrorCode.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/error/ErrorCode.java
index 3127b2b372..3302df2b16 100644
--- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/error/ErrorCode.java
+++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/error/ErrorCode.java
@@ -152,4 +152,10 @@ public class ErrorCode
public static final int INDEX_UPDATE_SINGLE_POINT_INDEX_FAIL = ERROR_INDEX_SERVER + 17;
public static final int INDEX_PURGE_MAIN_INDEX_FAIL = ERROR_INDEX_SERVER + 18;
public static final int INDEX_PURGE_SINGLE_POINT_FAIL = ERROR_INDEX_SERVER + 19;
+
+ // error code for node service
+ private static final int ERROR_NODE_SERVER = ERROR_BASE + 700;
+ public static final int NODE_RETINA_INFO_FAIL = ERROR_NODE_SERVER + 1;
+ public static final int NODE_NO_AVAILABLE = ERROR_NODE_SERVER + 2;
+ public static final int NODE_INVALID_BUCKET = ERROR_NODE_SERVER + 3;
}
diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/node/NodeService.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/node/NodeService.java
new file mode 100644
index 0000000000..84e94fb9d1
--- /dev/null
+++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/node/NodeService.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.common.node;
+
+import com.google.protobuf.Empty;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.pixelsdb.pixels.common.server.HostAddress;
+import io.pixelsdb.pixels.common.utils.ConfigFactory;
+import io.pixelsdb.pixels.common.utils.ShutdownHookManager;
+import io.pixelsdb.pixels.daemon.NodeProto;
+import io.pixelsdb.pixels.daemon.NodeServiceGrpc;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+public class NodeService
+{
+ private static final Logger logger = LogManager.getLogger(NodeService.class);
+
+ private static final NodeService defaultInstance;
+ private static final Map otherInstances = new ConcurrentHashMap<>();
+
+ static
+ {
+ String host = ConfigFactory.Instance().getProperty("node.server.host");
+ int port = Integer.parseInt(ConfigFactory.Instance().getProperty("node.server.port"));
+
+ defaultInstance = new NodeService(host, port);
+
+ ShutdownHookManager.Instance().registerShutdownHook(NodeService.class, false, () ->
+ {
+ try
+ {
+ defaultInstance.shutdown();
+ for (NodeService client : otherInstances.values())
+ {
+ client.shutdown();
+ }
+ otherInstances.clear();
+ } catch (InterruptedException e)
+ {
+ logger.error("Failed to shutdown NodeService", e);
+ }
+ });
+ }
+
+ private final ManagedChannel channel;
+ private final NodeServiceGrpc.NodeServiceBlockingStub stub;
+ private volatile boolean isShutDown;
+ private NodeService(String host, int port)
+ {
+ assert host != null;
+ assert port > 0 && port <= 65535;
+
+ this.channel = ManagedChannelBuilder.forAddress(host, port)
+ .usePlaintext()
+ .build();
+
+ this.stub = NodeServiceGrpc.newBlockingStub(channel);
+ this.isShutDown = false;
+ }
+
+ public static NodeService Instance()
+ {
+ return defaultInstance;
+ }
+
+ public static synchronized NodeService CreateInstance(String host, int port)
+ {
+ HostAddress address = HostAddress.fromParts(host, port);
+ NodeService client = otherInstances.get(address);
+ if (client != null)
+ {
+ return client;
+ }
+ client = new NodeService(host, port);
+ otherInstances.put(address, client);
+ return client;
+ }
+
+ private synchronized void shutdown() throws InterruptedException
+ {
+ if (!this.isShutDown)
+ {
+ this.channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ this.isShutDown = true;
+ }
+ }
+
+ public List getRetinaList()
+ {
+ NodeProto.GetRetinaListResponse resp;
+
+ try
+ {
+ resp = stub.getRetinaList(Empty.getDefaultInstance());
+ } catch (Exception e)
+ {
+ logger.error("Failed to call GetRetinaList", e);
+ throw e;
+ }
+
+ if (resp.getErrorCode() != 0)
+ {
+ logger.error("GetRetinaList returned error code {}", resp.getErrorCode());
+ return Collections.emptyList();
+ }
+
+ return resp.getNodesList();
+ }
+
+ public NodeProto.NodeInfo getRetinaByBucket(int bucketId)
+ {
+ NodeProto.GetRetinaByBucketRequest req =
+ NodeProto.GetRetinaByBucketRequest.newBuilder()
+ .setBucket(bucketId)
+ .build();
+
+ NodeProto.GetRetinaByBucketResponse resp;
+
+ try
+ {
+ resp = stub.getRetinaByBucket(req);
+ } catch (Exception e)
+ {
+ logger.error("Failed to call GetRetinaByBucket", e);
+ throw e;
+ }
+
+ if (resp.getErrorCode() != 0)
+ {
+ logger.error("GetRetinaByBucket returned error={}", resp.getErrorCode());
+ return null; // or throw exception
+ }
+
+ return resp.getNode();
+ }
+}
diff --git a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/Constants.java b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/Constants.java
index cdca3ccea3..7f2a6db759 100644
--- a/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/Constants.java
+++ b/pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/Constants.java
@@ -51,6 +51,7 @@ public final class Constants
public static final int HOSTNAME_INDEX_IN_CACHE_LOCATION_LITERAL = 3;
public static final String HEARTBEAT_COORDINATOR_LITERAL = "heartbeat_coordinator_";
public static final String HEARTBEAT_WORKER_LITERAL = "heartbeat_worker_";
+ public static final String HEARTBEAT_RETINA_LITERAL = "heartbeat_retina_";
public static final String CACHE_EXPAND_OR_SHRINK_LITERAL = "cache_expand_or_shrink"; // expand or shrink;1:expand, 2:shrink
public static final String PARTITION_OPERATOR_NAME = "partition";
diff --git a/pixels-common/src/main/resources/pixels.properties b/pixels-common/src/main/resources/pixels.properties
index 3b60c89a7d..b4f42cbc52 100644
--- a/pixels-common/src/main/resources/pixels.properties
+++ b/pixels-common/src/main/resources/pixels.properties
@@ -68,6 +68,13 @@ heartbeat.lease.ttl.seconds=20
# heartbeat period must be larger than 0
heartbeat.period.seconds=10
+###### pixels-node settings ######
+node.server.port=18891
+node.server.host=localhost
+# number of virtual nodes per physical node (used in consistent hashing)
+node.virtual.num=16
+node.bucket.num=1024
+
###### storage engine settings ######
### pixels reader, writer, and compactor settings ###
diff --git a/pixels-common/src/test/java/io/pixelsdb/pixels/common/node/TestNodeService.java b/pixels-common/src/test/java/io/pixelsdb/pixels/common/node/TestNodeService.java
new file mode 100644
index 0000000000..259382b80f
--- /dev/null
+++ b/pixels-common/src/test/java/io/pixelsdb/pixels/common/node/TestNodeService.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.common.node;
+
+import io.pixelsdb.pixels.daemon.NodeProto;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class TestNodeService
+{
+ private static final Logger logger = LoggerFactory.getLogger(TestNodeService.class);
+ @Test
+ public void testGetRetinaList()
+ {
+ NodeService nodeService = NodeService.Instance();
+ List retinaList = nodeService.getRetinaList();
+ logger.debug("Retina List Size: {}", retinaList.size());
+ for(NodeProto.NodeInfo nodeInfo : retinaList)
+ {
+ logger.debug(nodeInfo.toString());
+ }
+ }
+
+ @Test
+ public void testGetRetinaByBucketId()
+ {
+ NodeService nodeService = NodeService.Instance();
+ NodeProto.NodeInfo retinaByBucket = nodeService.getRetinaByBucket(1);
+ logger.info("Retina By Bucket: {}", retinaByBucket);
+ }
+}
diff --git a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/DaemonMain.java b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/DaemonMain.java
index 4e9dc4a055..c2ca3b9e34 100644
--- a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/DaemonMain.java
+++ b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/DaemonMain.java
@@ -5,6 +5,7 @@
import io.pixelsdb.pixels.daemon.cache.CacheCoordinator;
import io.pixelsdb.pixels.daemon.cache.CacheWorker;
import io.pixelsdb.pixels.daemon.index.IndexServer;
+import io.pixelsdb.pixels.daemon.node.NodeServer;
import io.pixelsdb.pixels.daemon.retina.RetinaServer;
import io.pixelsdb.pixels.daemon.exception.NoSuchServerException;
import io.pixelsdb.pixels.daemon.heartbeat.HeartbeatCoordinator;
@@ -33,18 +34,27 @@ public class DaemonMain
public static void main(String[] args)
{
- String role = System.getProperty("role");
+ String roleStr = System.getProperty("role");
String operation = System.getProperty("operation");
- if (role == null || operation == null)
+ if (roleStr == null || operation == null)
{
System.err.println("Run with -Doperation={start|stop} -Drole={coordinator|worker|retina}");
System.exit(1);
}
- if ((!role.equalsIgnoreCase("coordinator") && !role.equalsIgnoreCase("worker") && !role.equalsIgnoreCase("retina")) ||
- (!operation.equalsIgnoreCase("start") && !operation.equalsIgnoreCase("stop")))
- {
+ NodeProto.NodeRole role;
+ try {
+ role = NodeProto.NodeRole.valueOf(roleStr.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ System.err.println("Invalid role: " + roleStr);
+ System.err.println("Run with -Doperation={start|stop} -Drole={coordinator|worker|retina}");
+ System.exit(1);
+ return;
+ }
+
+ if (!operation.equalsIgnoreCase("start") && !operation.equalsIgnoreCase("stop")) {
+ System.err.println("Invalid operation: " + operation);
System.err.println("Run with -Doperation={start|stop} -Drole={coordinator|worker|retina}");
System.exit(1);
}
@@ -54,12 +64,12 @@ public static void main(String[] args)
{
varDir += "/";
}
- String lockFile = varDir + "pixels." + role.toLowerCase() + ".lock";
+ String lockFile = varDir + "pixels." + roleStr.toLowerCase() + ".lock";
if (operation.equalsIgnoreCase("start"))
{
// this is the main daemon.
- System.out.println("Starting daemon of " + role + "...");
+ System.out.println("Starting daemon of " + roleStr + "...");
Daemon mainDaemon = new Daemon();
mainDaemon.setup(lockFile);
@@ -80,12 +90,13 @@ public static void main(String[] args)
boolean sinkServerEnabled = Boolean.parseBoolean(config.getProperty("sink.server.enabled"));
boolean indexServerEnabled = Boolean.parseBoolean(config.getProperty("index.server.enabled"));
- if (role.equalsIgnoreCase("coordinator"))
+ if (role.equals(NodeProto.NodeRole.COORDINATOR))
{
int metadataServerPort = Integer.parseInt(config.getProperty("metadata.server.port"));
int transServerPort = Integer.parseInt(config.getProperty("trans.server.port"));
int queryScheduleServerPort = Integer.parseInt(config.getProperty("query.schedule.server.port"));
int scalingMetricsServerPort = Integer.parseInt(config.getProperty("scaling.metrics.server.port"));
+ int nodeServerPort = Integer.parseInt(config.getProperty("node.server.port"));
try
{
@@ -101,6 +112,9 @@ public static void main(String[] args)
// start query schedule server
QueryScheduleServer queryScheduleServer = new QueryScheduleServer(queryScheduleServerPort);
container.addServer("query_schedule", queryScheduleServer);
+ // start node server
+ NodeServer nodeServer = new NodeServer(nodeServerPort);
+ container.addServer("node", nodeServer);
if (autoScalingEnabled)
{
@@ -121,7 +135,7 @@ public static void main(String[] args)
log.error("failed to start coordinator", e);
}
}
- else if (role.equalsIgnoreCase("worker"))
+ else if (role.equals(NodeProto.NodeRole.WORKER))
{
boolean metricsServerEnabled = Boolean.parseBoolean(
ConfigFactory.Instance().getProperty("metrics.server.enabled"));
@@ -129,7 +143,7 @@ else if (role.equalsIgnoreCase("worker"))
try
{
// start heartbeat worker
- HeartbeatWorker heartbeatWorker = new HeartbeatWorker();
+ HeartbeatWorker heartbeatWorker = new HeartbeatWorker(role);
container.addServer("heartbeat_worker", heartbeatWorker);
// start metrics server and cache worker on worker node
if (metricsServerEnabled)
@@ -148,10 +162,14 @@ else if (role.equalsIgnoreCase("worker"))
log.error("failed to start worker", e);
}
}
- else
+ else if(role.equals(NodeProto.NodeRole.RETINA))
{
int retinaServerPort = Integer.parseInt(config.getProperty("retina.server.port"));
+ // start heartbeat worker
+ HeartbeatWorker heartbeatWorker = new HeartbeatWorker(role);
+ container.addServer("heartbeat_worker", heartbeatWorker);
+
try
{
// start retina server on worker node
@@ -244,7 +262,7 @@ else if (role.equalsIgnoreCase("worker"))
}
catch (Throwable e)
{
- log.error("error in the main loop of pixels daemon of {}", role, e);
+ log.error("error in the main loop of pixels daemon of {}", roleStr, e);
break;
}
}
@@ -252,7 +270,7 @@ else if (role.equalsIgnoreCase("worker"))
}
else
{
- System.out.println("Stopping daemon of " + role + "...");
+ System.out.println("Stopping daemon of " + roleStr + "...");
try
{
@@ -275,7 +293,7 @@ else if (role.equalsIgnoreCase("worker"))
// get the role name of the target daemon (to be killing).
for (int i = 2; i < splits.length; ++i)
{
- if (splits[i].contains("-Drole=" + role))
+ if (splits[i].contains("-Drole=" + roleStr))
{
roleFound = true;
}
@@ -291,7 +309,7 @@ else if (role.equalsIgnoreCase("worker"))
int pid = Integer.parseInt(splits[0]);
if (roleFound && isStartOperation)
{
- System.out.println("Killing " + role + ", pid (" + pid + ")");
+ System.out.println("Killing " + roleStr + ", pid (" + pid + ")");
// Terminate the daemon gracefully by sending SIGTERM(15) signal.
Runtime.getRuntime().exec("kill -15 " + pid);
}
@@ -302,7 +320,7 @@ else if (role.equalsIgnoreCase("worker"))
}
catch (IOException e)
{
- log.error("error when stopping pixels daemon of {}", role, e);
+ log.error("error when stopping pixels daemon of {}", roleStr, e);
}
}
}
diff --git a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/heartbeat/HeartbeatWorker.java b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/heartbeat/HeartbeatWorker.java
index 8b1f78cdf7..f5629d4579 100644
--- a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/heartbeat/HeartbeatWorker.java
+++ b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/heartbeat/HeartbeatWorker.java
@@ -24,6 +24,7 @@
import io.pixelsdb.pixels.common.server.Server;
import io.pixelsdb.pixels.common.utils.Constants;
import io.pixelsdb.pixels.common.utils.EtcdUtil;
+import io.pixelsdb.pixels.daemon.NodeProto;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -45,13 +46,15 @@ public class HeartbeatWorker implements Server
private static final AtomicInteger currentStatus = new AtomicInteger(NodeStatus.READY.StatusCode);
private final HeartbeatConfig heartbeatConfig = new HeartbeatConfig();
private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+ private final NodeProto.NodeRole role;
private String hostName;
private WorkerRegister workerRegister;
private boolean initializeSuccess = false;
private CountDownLatch runningLatch;
- public HeartbeatWorker()
+ public HeartbeatWorker(NodeProto.NodeRole role)
{
+ this.role = role;
this.hostName = System.getenv("HOSTNAME");
logger.debug("HostName from system env: {}", hostName);
if (hostName == null)
@@ -91,7 +94,18 @@ private void initialize()
// 2. register the worker
Lease leaseClient = EtcdUtil.Instance().getClient().getLeaseClient();
long leaseId = leaseClient.grant(heartbeatConfig.getNodeLeaseTTL()).get(10, TimeUnit.SECONDS).getID();
- String key = Constants.HEARTBEAT_WORKER_LITERAL + hostName;
+ String key;
+ switch (role)
+ {
+ case WORKER:
+ key = Constants.HEARTBEAT_WORKER_LITERAL + hostName;
+ break;
+ case RETINA:
+ key = Constants.HEARTBEAT_RETINA_LITERAL + hostName;
+ break;
+ default:
+ throw new IllegalStateException("Unknown heartbeat role: " + role);
+ }
EtcdUtil.Instance().putKeyValueWithLeaseId(key, String.valueOf(currentStatus.get()), leaseId);
// start a scheduled thread to update node status periodically
this.workerRegister = new WorkerRegister(key, leaseClient, leaseId);
@@ -123,7 +137,18 @@ public void shutdown()
{
workerRegister.stop();
}
- EtcdUtil.Instance().deleteByPrefix(Constants.HEARTBEAT_WORKER_LITERAL);
+ switch (role)
+ {
+ case WORKER:
+ EtcdUtil.Instance().deleteByPrefix(Constants.HEARTBEAT_WORKER_LITERAL);
+ break;
+ case RETINA:
+ EtcdUtil.Instance().deleteByPrefix(Constants.HEARTBEAT_RETINA_LITERAL);
+ break;
+ default:
+ throw new IllegalStateException("Unknown heartbeat role: " + role);
+ }
+
if (runningLatch != null)
{
runningLatch.countDown();
diff --git a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/node/NodeServer.java b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/node/NodeServer.java
new file mode 100644
index 0000000000..f889abb5d0
--- /dev/null
+++ b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/node/NodeServer.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.daemon.node;
+
+import io.grpc.ServerBuilder;
+import io.pixelsdb.pixels.common.server.Server;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+public class NodeServer implements Server
+{
+ private static final Logger log = LogManager.getLogger(NodeServer.class);
+
+ private boolean running = false;
+ private final io.grpc.Server rpcServer;
+
+ public NodeServer(int port)
+ {
+ assert (port > 0 && port <= 65535);
+ this.rpcServer = ServerBuilder.forPort(port)
+ .addService(new NodeServiceImpl()).build();
+ }
+
+ @Override
+ public boolean isRunning()
+ {
+ return this.running;
+ }
+
+ @Override
+ public void shutdown()
+ {
+ this.running = false;
+ try
+ {
+ this.rpcServer.shutdown().awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e)
+ {
+ log.error("Interrupted when shutdown node server.", e);
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ this.rpcServer.start();
+ this.running = true;
+ this.rpcServer.awaitTermination();
+ } catch (IOException e)
+ {
+ log.error("I/O error when running.", e);
+ } catch (InterruptedException e)
+ {
+ log.error("Interrupted when running.", e);
+ } finally
+ {
+ this.shutdown();
+ }
+ }
+}
diff --git a/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/node/NodeServiceImpl.java b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/node/NodeServiceImpl.java
new file mode 100644
index 0000000000..b833bdc1e9
--- /dev/null
+++ b/pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/node/NodeServiceImpl.java
@@ -0,0 +1,294 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+package io.pixelsdb.pixels.daemon.node;
+
+import com.google.protobuf.Empty;
+import io.etcd.jetcd.KeyValue;
+import io.grpc.stub.StreamObserver;
+import io.pixelsdb.pixels.common.error.ErrorCode;
+import io.pixelsdb.pixels.common.utils.ConfigFactory;
+import io.pixelsdb.pixels.common.utils.Constants;
+import io.pixelsdb.pixels.common.utils.EtcdUtil;
+import io.pixelsdb.pixels.daemon.NodeProto;
+import io.pixelsdb.pixels.daemon.NodeServiceGrpc;
+import io.pixelsdb.pixels.daemon.heartbeat.HeartbeatConfig;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * gRPC implementation of NodeService with dynamic consistent hashing.
+ * Supports adding/removing Retina nodes at runtime.
+ * Implementation is modified to use bucketNum as the number of virtual nodes,
+ * and assumes the client provides a pre-hashed key (bucket ID).
+ */
+public class NodeServiceImpl extends NodeServiceGrpc.NodeServiceImplBase
+{
+
+ private static final Logger logger = LogManager.getLogger(NodeServiceImpl.class);
+
+ private final int virtualNode;
+ private final int bucketNum;
+
+ // Consistent Hash Ring: Hash Value -> NodeInfo (using TreeMap for SortedMap)
+ // The keys in this map will range from 0 to bucketNum - 1.
+ private final SortedMap hashRing = new TreeMap<>();
+
+ // Lock for thread safety during dynamic updates
+ private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ private final HeartbeatConfig heartbeatConfig;
+ /**
+ * Watch Etcd heartbeat changes and dynamically update hash ring.
+ */
+ private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+
+ public NodeServiceImpl()
+ {
+ ConfigFactory config = ConfigFactory.Instance();
+ virtualNode = Integer.parseInt(config.getProperty("node.virtual.num"));
+ // bucketNum is the total number of hash points
+ bucketNum = Integer.parseInt(config.getProperty("node.bucket.num"));
+ this.heartbeatConfig = new HeartbeatConfig();
+ // Initial load from Etcd
+ reloadRetinaNodesFromEtcd();
+ startEtcdWatcher();
+ }
+
+ private String getAddressFromKV(KeyValue kv)
+ {
+ return kv.getKey()
+ .toString(StandardCharsets.UTF_8).substring(Constants.HEARTBEAT_RETINA_LITERAL.length());
+ }
+
+ /**
+ * Load nodes from Etcd and build hash ring.
+ */
+ private void reloadRetinaNodesFromEtcd()
+ {
+ List retinaNodes = EtcdUtil.Instance()
+ .getKeyValuesByPrefix(Constants.HEARTBEAT_RETINA_LITERAL);
+ lock.writeLock().lock();
+ try
+ {
+ hashRing.clear();
+ for (KeyValue kv : retinaNodes)
+ {
+ NodeProto.NodeInfo node = NodeProto.NodeInfo.newBuilder()
+ .setAddress(getAddressFromKV(kv))
+ .setRole(NodeProto.NodeRole.RETINA)
+ .build();
+ addNodeInternal(node);
+ }
+ logger.info("Initial hash ring loaded with {} physical nodes and {} virtual nodes.",
+ hashRing.values().stream().distinct().count(), bucketNum);
+ } finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private void startEtcdWatcher()
+ {
+ Runnable watchTask = () ->
+ {
+ try
+ {
+ List currentNodes = EtcdUtil.Instance()
+ .getKeyValuesByPrefix(Constants.HEARTBEAT_RETINA_LITERAL);
+ updateHashRing(currentNodes);
+ } catch (Exception e)
+ {
+ logger.error("Error watching Etcd changes", e);
+ }
+ };
+
+ // Schedule the task at fixed rate
+ scheduledExecutor.scheduleAtFixedRate(
+ watchTask,
+ heartbeatConfig.getNodeHeartbeatPeriod(), // initial delay
+ heartbeatConfig.getNodeHeartbeatPeriod(), // period in seconds
+ TimeUnit.SECONDS
+ );
+ }
+
+ // Removed the non-consistent-hashing updateBucketMapping method.
+
+ /**
+ * Update hash ring with current nodes (add new, remove missing).
+ */
+ private void updateHashRing(List currentNodes)
+ {
+ lock.writeLock().lock();
+ try
+ {
+ // 1. Collect current active addresses from Etcd
+ Set newAddresses = new HashSet<>();
+ for (KeyValue kv : currentNodes)
+ {
+ newAddresses.add(getAddressFromKV(kv));
+ }
+
+ // 2. Collect addresses currently on the ring
+ Set existingAddresses = new HashSet<>();
+ for (NodeProto.NodeInfo node : new TreeSet<>(hashRing.values()))
+ {
+ existingAddresses.add(node.getAddress());
+ }
+
+ // 3. Remove nodes not present anymore (existingAddresses - newAddresses)
+ for (NodeProto.NodeInfo node : new TreeSet<>(hashRing.values()))
+ {
+ if (!newAddresses.contains(node.getAddress()))
+ {
+ removeNodeInternal(node);
+ logger.warn("Removed node from hash ring: " + node.getAddress());
+ }
+ }
+
+ // 4. Add new nodes (newAddresses - existingAddresses)
+ for (String newAddr : newAddresses)
+ {
+ if (!existingAddresses.contains(newAddr))
+ {
+ NodeProto.NodeInfo node = NodeProto.NodeInfo.newBuilder()
+ .setAddress(newAddr)
+ .setRole(NodeProto.NodeRole.RETINA)
+ .build();
+ addNodeInternal(node);
+ logger.info("Added node to hash ring: " + newAddr);
+ }
+ }
+ } finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Add a node to the hash ring internally, mapping to the fixed hash points [0, bucketNum-1].
+ */
+ private void addNodeInternal(NodeProto.NodeInfo node)
+ {
+ for (int i = 0; i < virtualNode; i++)
+ {
+ int hashPoint = hash(node.getAddress() + "#" + i) % bucketNum;
+ hashRing.put(hashPoint, node);
+ }
+ }
+
+ /**
+ * Remove a node from the hash ring internally.
+ */
+ private void removeNodeInternal(NodeProto.NodeInfo node)
+ {
+ // Recalculate and remove all virtual nodes
+ for (int i = 0; i < bucketNum; i++)
+ {
+ int hashPoint = hash(node.getAddress() + "#" + i) % bucketNum;
+ hashRing.remove(hashPoint);
+ }
+ // Note: The removal above is technically incomplete if multiple virtual nodes map to the same hashPoint,
+ // but given the requirement, we assume hash() provides a reasonably uniform spread.
+ }
+
+ /**
+ * Simple hash function for consistent hashing.
+ */
+ private int hash(String key)
+ {
+ return key.hashCode();
+ }
+
+ /**
+ * RPC: Get all retina nodes.
+ * (No change, uses the hashRing values)
+ */
+ @Override
+ public void getRetinaList(Empty request,
+ StreamObserver responseObserver)
+ {
+ NodeProto.GetRetinaListResponse.Builder responseBuilder = NodeProto.GetRetinaListResponse.newBuilder();
+ lock.readLock().lock();
+ try
+ {
+ hashRing.values().stream().distinct().forEach(responseBuilder::addNodes);
+ responseBuilder.setErrorCode(ErrorCode.SUCCESS);
+ } catch (Exception e)
+ {
+ responseBuilder.setErrorCode(ErrorCode.NODE_RETINA_INFO_FAIL);
+ logger.error("Failed to retrieve retina nodes", e);
+ } finally
+ {
+ lock.readLock().unlock();
+ }
+ responseObserver.onNext(responseBuilder.build());
+ responseObserver.onCompleted();
+ }
+
+ /**
+ * RPC: Get retina node by bucket id using consistent hashing.
+ * MODIFIED: Uses the input 'bucket' (which is the pre-hashed key/virtual node ID) directly as the lookup point.
+ */
+ @Override
+ public void getRetinaByBucket(NodeProto.GetRetinaByBucketRequest request,
+ StreamObserver responseObserver)
+ {
+ NodeProto.GetRetinaByBucketResponse.Builder responseBuilder = NodeProto.GetRetinaByBucketResponse.newBuilder();
+ lock.readLock().lock();
+ try
+ {
+ if (hashRing.isEmpty())
+ {
+ responseBuilder.setErrorCode(ErrorCode.NODE_NO_AVAILABLE);
+ } else
+ {
+ int bucketId = request.getBucket();
+
+ if (bucketId < 0 || bucketId >= bucketNum)
+ {
+ logger.error("Invalid bucket hash received: " + bucketId);
+ responseBuilder.setErrorCode(ErrorCode.NODE_INVALID_BUCKET);
+ } else
+ {
+ SortedMap tail = hashRing.tailMap(bucketId);
+ int nodeHashPoint = tail.isEmpty() ? hashRing.firstKey() : tail.firstKey();
+ NodeProto.NodeInfo node = hashRing.get(nodeHashPoint);
+ responseBuilder.setErrorCode(ErrorCode.SUCCESS).setNode(node);
+ }
+ }
+ } catch (Exception e)
+ {
+ responseBuilder.setErrorCode(ErrorCode.NODE_RETINA_INFO_FAIL);
+ logger.error("Failed to get retina node by bucket", e);
+ } finally
+ {
+ lock.readLock().unlock();
+ }
+ responseObserver.onNext(responseBuilder.build());
+ responseObserver.onCompleted();
+ }
+}
diff --git a/pixels-daemon/src/test/java/io/pixelsdb/pixels/daemon/heartbeat/TestHeartbeatWorker.java b/pixels-daemon/src/test/java/io/pixelsdb/pixels/daemon/heartbeat/TestHeartbeatWorker.java
index 89d44d34a1..1d65267ecb 100644
--- a/pixels-daemon/src/test/java/io/pixelsdb/pixels/daemon/heartbeat/TestHeartbeatWorker.java
+++ b/pixels-daemon/src/test/java/io/pixelsdb/pixels/daemon/heartbeat/TestHeartbeatWorker.java
@@ -19,6 +19,7 @@
*/
package io.pixelsdb.pixels.daemon.heartbeat;
+import io.pixelsdb.pixels.daemon.NodeProto;
import org.junit.Test;
/**
@@ -30,7 +31,7 @@ public class TestHeartbeatWorker
@Test
public void test() throws InterruptedException
{
- HeartbeatWorker worker = new HeartbeatWorker();
+ HeartbeatWorker worker = new HeartbeatWorker(NodeProto.NodeRole.WORKER);
Thread thread = new Thread(worker);
thread.start();
thread.join();
diff --git a/proto/node.proto b/proto/node.proto
new file mode 100644
index 0000000000..7bff71328d
--- /dev/null
+++ b/proto/node.proto
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2025 PixelsDB.
+ *
+ * This file is part of Pixels.
+ *
+ * Pixels is free software: you can redistribute it and/or modify
+ * it under the terms of the Affero GNU General Public License as
+ * published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Pixels is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Affero GNU General Public License for more details.
+ *
+ * You should have received a copy of the Affero GNU General Public
+ * License along with Pixels. If not, see
+ * .
+ */
+
+syntax = "proto3";
+
+import "google/protobuf/empty.proto";
+
+option java_multiple_files = false;
+option java_package = "io.pixelsdb.pixels.daemon";
+option java_outer_classname = "NodeProto";
+
+package node.proto;
+
+// The node management services definition.
+service NodeService {
+ rpc GetRetinaList(google.protobuf.Empty) returns (GetRetinaListResponse);
+ rpc GetRetinaByBucket(GetRetinaByBucketRequest) returns (GetRetinaByBucketResponse);
+}
+
+enum NodeRole {
+ COORDINATOR = 0;
+ WORKER = 1;
+ RETINA = 2;
+}
+
+message NodeInfo {
+ string address = 1;
+ uint32 port = 2;
+ NodeRole role = 3;
+}
+
+message GetRetinaListResponse {
+ int32 errorCode = 1;
+ repeated NodeInfo nodes = 2;
+}
+
+message GetRetinaByBucketRequest {
+ int32 bucket = 1;
+}
+
+message GetRetinaByBucketResponse {
+ int32 errorCode = 1;
+ NodeInfo node = 2;
+}