[Issue #1215] Implement NodeService, Retina Node Heartbeat, and Consistent Hashing#1216
[Issue #1215] Implement NodeService, Retina Node Heartbeat, and Consistent Hashing#1216bianhq merged 3 commits intopixelsdb:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This pull request implements a NodeService with consistent hashing for managing Retina nodes, adds heartbeat functionality for Retina nodes, and integrates these components into the daemon architecture.
- Introduces gRPC-based NodeService with dynamic consistent hashing for Retina node discovery and routing
- Extends HeartbeatWorker to support Retina nodes in addition to Worker nodes
- Adds NodeServer to the coordinator daemon for centralized node management
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 21 comments.
Show a summary per file
| File | Description |
|---|---|
| proto/node.proto | Defines gRPC service and messages for node management operations including GetRetinaList and GetRetinaByBucket |
| pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/node/NodeServiceImpl.java | Implements NodeService with consistent hashing ring, etcd-based node discovery, and dynamic node add/remove |
| pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/node/NodeServer.java | Creates gRPC server wrapper for NodeService |
| pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/heartbeat/HeartbeatWorker.java | Extends heartbeat support to Retina nodes using role-based etcd key registration |
| pixels-daemon/src/main/java/io/pixelsdb/pixels/daemon/DaemonMain.java | Integrates NodeServer into coordinator startup and uses NodeRole enum for type-safe role handling |
| pixels-common/src/main/java/io/pixelsdb/pixels/common/node/NodeService.java | Provides client-side gRPC stub for communicating with NodeService |
| pixels-common/src/main/java/io/pixelsdb/pixels/common/utils/Constants.java | Adds HEARTBEAT_RETINA_LITERAL constant for etcd key prefixing |
| pixels-common/src/main/java/io/pixelsdb/pixels/common/error/ErrorCode.java | Defines error codes for node service operations |
| pixels-common/src/main/resources/pixels.properties | Adds configuration properties for node server port, virtual nodes, and bucket count |
| pixels-common/src/test/java/io/pixelsdb/pixels/common/node/TestNodeService.java | Provides basic tests for NodeService client operations |
| pixels-daemon/src/test/java/io/pixelsdb/pixels/daemon/heartbeat/TestHeartbeatWorker.java | Updates test to use NodeRole parameter for HeartbeatWorker |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ConfigFactory config = ConfigFactory.Instance(); | ||
| virtualNode = Integer.parseInt(config.getProperty("node.virtual.num")); | ||
| // bucketNum is the total number of hash points | ||
| bucketNum = Integer.parseInt(config.getProperty("node.bucket.num")); |
There was a problem hiding this comment.
Missing input validation for configuration properties. The constructor should validate that virtualNode and bucketNum are positive integers, and that virtualNode is less than or equal to bucketNum to ensure proper consistent hashing. Invalid configuration values could cause runtime errors or incorrect behavior.
| bucketNum = Integer.parseInt(config.getProperty("node.bucket.num")); | |
| bucketNum = Integer.parseInt(config.getProperty("node.bucket.num")); | |
| // Validate configuration values | |
| if (virtualNode <= 0) { | |
| throw new IllegalArgumentException("Configuration error: node.virtual.num must be a positive integer, got " + virtualNode); | |
| } | |
| if (bucketNum <= 0) { | |
| throw new IllegalArgumentException("Configuration error: node.bucket.num must be a positive integer, got " + bucketNum); | |
| } | |
| if (virtualNode > bucketNum) { | |
| throw new IllegalArgumentException("Configuration error: node.virtual.num (" + virtualNode + ") must be less than or equal to node.bucket.num (" + bucketNum + ")"); | |
| } |
| * gRPC implementation of NodeService with dynamic consistent hashing. | ||
| * Supports adding/removing Retina nodes at runtime. | ||
| * Implementation is modified to use bucketNum as the number of virtual nodes, | ||
| * and assumes the client provides a pre-hashed key (bucket ID). |
There was a problem hiding this comment.
The class-level documentation on lines 43-44 mentions "assumes the client provides a pre-hashed key (bucket ID)" but this is misleading. Looking at the getRetinaByBucket method, the client provides a bucket ID directly (0 to bucketNum-1), not a pre-hashed key. Consider updating the documentation to clarify that clients should provide a bucket ID in the range [0, bucketNum-1].
| * and assumes the client provides a pre-hashed key (bucket ID). | |
| * and assumes the client provides a bucket ID in the range [0, bucketNum-1]. |
| node.server.port=18891 | ||
| node.server.host=localhost | ||
| # virtual nodes num | ||
| node.virtual.num=16 |
There was a problem hiding this comment.
[nitpick] Add a comment to explain what node.bucket.num represents. This should clarify that it's the total number of hash buckets in the consistent hash ring.
| node.virtual.num=16 | |
| node.virtual.num=16 | |
| # the total number of hash buckets in the consistent hash ring |
| int metadataServerPort = Integer.parseInt(config.getProperty("metadata.server.port")); | ||
| int transServerPort = Integer.parseInt(config.getProperty("trans.server.port")); | ||
| int queryScheduleServerPort = Integer.parseInt(config.getProperty("query.schedule.server.port")); | ||
| int scalingMetricsServerPort = Integer.parseInt(config.getProperty("scaling.metrics.server.port")); | ||
| int nodeServerPort = Integer.parseInt(config.getProperty("node.server.port")); |
There was a problem hiding this comment.
Potential uncaught 'java.lang.NumberFormatException'.
| int metadataServerPort = Integer.parseInt(config.getProperty("metadata.server.port")); | |
| int transServerPort = Integer.parseInt(config.getProperty("trans.server.port")); | |
| int queryScheduleServerPort = Integer.parseInt(config.getProperty("query.schedule.server.port")); | |
| int scalingMetricsServerPort = Integer.parseInt(config.getProperty("scaling.metrics.server.port")); | |
| int nodeServerPort = Integer.parseInt(config.getProperty("node.server.port")); | |
| int metadataServerPort; | |
| int transServerPort; | |
| int queryScheduleServerPort; | |
| int scalingMetricsServerPort; | |
| int nodeServerPort; | |
| try { | |
| metadataServerPort = Integer.parseInt(config.getProperty("metadata.server.port")); | |
| } catch (NumberFormatException e) { | |
| log.error("Invalid value for metadata.server.port: " + config.getProperty("metadata.server.port"), e); | |
| throw e; | |
| } | |
| try { | |
| transServerPort = Integer.parseInt(config.getProperty("trans.server.port")); | |
| } catch (NumberFormatException e) { | |
| log.error("Invalid value for trans.server.port: " + config.getProperty("trans.server.port"), e); | |
| throw e; | |
| } | |
| try { | |
| queryScheduleServerPort = Integer.parseInt(config.getProperty("query.schedule.server.port")); | |
| } catch (NumberFormatException e) { | |
| log.error("Invalid value for query.schedule.server.port: " + config.getProperty("query.schedule.server.port"), e); | |
| throw e; | |
| } | |
| try { | |
| scalingMetricsServerPort = Integer.parseInt(config.getProperty("scaling.metrics.server.port")); | |
| } catch (NumberFormatException e) { | |
| log.error("Invalid value for scaling.metrics.server.port: " + config.getProperty("scaling.metrics.server.port"), e); | |
| throw e; | |
| } | |
| try { | |
| nodeServerPort = Integer.parseInt(config.getProperty("node.server.port")); | |
| } catch (NumberFormatException e) { | |
| log.error("Invalid value for node.server.port: " + config.getProperty("node.server.port"), e); | |
| throw e; | |
| } |
| virtualNode = Integer.parseInt(config.getProperty("node.virtual.num")); | ||
| // bucketNum is the total number of hash points | ||
| bucketNum = Integer.parseInt(config.getProperty("node.bucket.num")); |
There was a problem hiding this comment.
Potential uncaught 'java.lang.NumberFormatException'.
| virtualNode = Integer.parseInt(config.getProperty("node.virtual.num")); | |
| // bucketNum is the total number of hash points | |
| bucketNum = Integer.parseInt(config.getProperty("node.bucket.num")); | |
| int tmpVirtualNode; | |
| int tmpBucketNum; | |
| try { | |
| tmpVirtualNode = Integer.parseInt(config.getProperty("node.virtual.num")); | |
| } catch (NumberFormatException e) { | |
| logger.error("Invalid value for 'node.virtual.num': {}", config.getProperty("node.virtual.num"), e); | |
| throw new IllegalArgumentException("Invalid configuration for 'node.virtual.num'", e); | |
| } | |
| // bucketNum is the total number of hash points | |
| try { | |
| tmpBucketNum = Integer.parseInt(config.getProperty("node.bucket.num")); | |
| } catch (NumberFormatException e) { | |
| logger.error("Invalid value for 'node.bucket.num': {}", config.getProperty("node.bucket.num"), e); | |
| throw new IllegalArgumentException("Invalid configuration for 'node.bucket.num'", e); | |
| } | |
| virtualNode = tmpVirtualNode; | |
| bucketNum = tmpBucketNum; |
| virtualNode = Integer.parseInt(config.getProperty("node.virtual.num")); | ||
| // bucketNum is the total number of hash points | ||
| bucketNum = Integer.parseInt(config.getProperty("node.bucket.num")); |
There was a problem hiding this comment.
Potential uncaught 'java.lang.NumberFormatException'.
| virtualNode = Integer.parseInt(config.getProperty("node.virtual.num")); | |
| // bucketNum is the total number of hash points | |
| bucketNum = Integer.parseInt(config.getProperty("node.bucket.num")); | |
| int tmpVirtualNode = 1; | |
| try { | |
| tmpVirtualNode = Integer.parseInt(config.getProperty("node.virtual.num")); | |
| } catch (NumberFormatException e) { | |
| logger.error("Invalid value for node.virtual.num in config, using default: 1", e); | |
| } | |
| virtualNode = tmpVirtualNode; | |
| // bucketNum is the total number of hash points | |
| int tmpBucketNum = 1; | |
| try { | |
| tmpBucketNum = Integer.parseInt(config.getProperty("node.bucket.num")); | |
| } catch (NumberFormatException e) { | |
| logger.error("Invalid value for node.bucket.num in config, using default: 1", e); | |
| } | |
| bucketNum = tmpBucketNum; |
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
No description provided.