Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,4 +152,10 @@ public class ErrorCode
public static final int INDEX_UPDATE_SINGLE_POINT_INDEX_FAIL = ERROR_INDEX_SERVER + 17;
public static final int INDEX_PURGE_MAIN_INDEX_FAIL = ERROR_INDEX_SERVER + 18;
public static final int INDEX_PURGE_SINGLE_POINT_FAIL = ERROR_INDEX_SERVER + 19;

// error code for node service
private static final int ERROR_NODE_SERVER = ERROR_BASE + 700;
public static final int NODE_RETINA_INFO_FAIL = ERROR_NODE_SERVER + 1;
public static final int NODE_NO_AVAILABLE = ERROR_NODE_SERVER + 2;
public static final int NODE_INVALID_BUCKET = ERROR_NODE_SERVER + 3;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright 2025 PixelsDB.
*
* This file is part of Pixels.
*
* Pixels is free software: you can redistribute it and/or modify
* it under the terms of the Affero GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Pixels is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Affero GNU General Public License for more details.
*
* You should have received a copy of the Affero GNU General Public
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/

package io.pixelsdb.pixels.common.node;

import com.google.protobuf.Empty;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.pixelsdb.pixels.common.server.HostAddress;
import io.pixelsdb.pixels.common.utils.ConfigFactory;
import io.pixelsdb.pixels.common.utils.ShutdownHookManager;
import io.pixelsdb.pixels.daemon.NodeProto;
import io.pixelsdb.pixels.daemon.NodeServiceGrpc;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public class NodeService
{
private static final Logger logger = LogManager.getLogger(NodeService.class);

private static final NodeService defaultInstance;
private static final Map<HostAddress, NodeService> otherInstances = new ConcurrentHashMap<>();

static
{
String host = ConfigFactory.Instance().getProperty("node.server.host");
int port = Integer.parseInt(ConfigFactory.Instance().getProperty("node.server.port"));
Comment thread
bianhq marked this conversation as resolved.

defaultInstance = new NodeService(host, port);

ShutdownHookManager.Instance().registerShutdownHook(NodeService.class, false, () ->
{
try
{
defaultInstance.shutdown();
for (NodeService client : otherInstances.values())
{
client.shutdown();
}
otherInstances.clear();
} catch (InterruptedException e)
{
logger.error("Failed to shutdown NodeService", e);
}
});
}

private final ManagedChannel channel;
private final NodeServiceGrpc.NodeServiceBlockingStub stub;
private volatile boolean isShutDown;
private NodeService(String host, int port)
{
assert host != null;
assert port > 0 && port <= 65535;

this.channel = ManagedChannelBuilder.forAddress(host, port)
.usePlaintext()
.build();

this.stub = NodeServiceGrpc.newBlockingStub(channel);
this.isShutDown = false;
}

public static NodeService Instance()
{
return defaultInstance;
}

public static synchronized NodeService CreateInstance(String host, int port)
{
HostAddress address = HostAddress.fromParts(host, port);
NodeService client = otherInstances.get(address);
if (client != null)
{
return client;
}
client = new NodeService(host, port);
otherInstances.put(address, client);
return client;
}

private synchronized void shutdown() throws InterruptedException
{
if (!this.isShutDown)
{
this.channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
this.isShutDown = true;
}
}

public List<NodeProto.NodeInfo> getRetinaList()
{
NodeProto.GetRetinaListResponse resp;

try
{
resp = stub.getRetinaList(Empty.getDefaultInstance());
} catch (Exception e)
{
logger.error("Failed to call GetRetinaList", e);
throw e;
}

if (resp.getErrorCode() != 0)
{
logger.error("GetRetinaList returned error code {}", resp.getErrorCode());
return Collections.emptyList();
}

return resp.getNodesList();
}

public NodeProto.NodeInfo getRetinaByBucket(int bucketId)
{
NodeProto.GetRetinaByBucketRequest req =
NodeProto.GetRetinaByBucketRequest.newBuilder()
.setBucket(bucketId)
.build();

NodeProto.GetRetinaByBucketResponse resp;

try
{
resp = stub.getRetinaByBucket(req);
} catch (Exception e)
{
logger.error("Failed to call GetRetinaByBucket", e);
throw e;
}

if (resp.getErrorCode() != 0)
{
logger.error("GetRetinaByBucket returned error={}", resp.getErrorCode());
return null; // or throw exception
}

return resp.getNode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public final class Constants
public static final int HOSTNAME_INDEX_IN_CACHE_LOCATION_LITERAL = 3;
public static final String HEARTBEAT_COORDINATOR_LITERAL = "heartbeat_coordinator_";
public static final String HEARTBEAT_WORKER_LITERAL = "heartbeat_worker_";
public static final String HEARTBEAT_RETINA_LITERAL = "heartbeat_retina_";
public static final String CACHE_EXPAND_OR_SHRINK_LITERAL = "cache_expand_or_shrink"; // expand or shrink;1:expand, 2:shrink

public static final String PARTITION_OPERATOR_NAME = "partition";
Expand Down
7 changes: 7 additions & 0 deletions pixels-common/src/main/resources/pixels.properties
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ heartbeat.lease.ttl.seconds=20
# heartbeat period must be larger than 0
heartbeat.period.seconds=10

###### pixels-node settings ######
node.server.port=18891
node.server.host=localhost
# number of virtual nodes per physical node (used in consistent hashing)
node.virtual.num=16
Copy link

Copilot AI Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Add a comment to explain what node.bucket.num represents. This should clarify that it's the total number of hash buckets in the consistent hash ring.

Suggested change
node.virtual.num=16
node.virtual.num=16
# the total number of hash buckets in the consistent hash ring

Copilot uses AI. Check for mistakes.
node.bucket.num=1024

###### storage engine settings ######

### pixels reader, writer, and compactor settings ###
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2025 PixelsDB.
Comment thread
bianhq marked this conversation as resolved.
*
* This file is part of Pixels.
*
* Pixels is free software: you can redistribute it and/or modify
* it under the terms of the Affero GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Pixels is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Affero GNU General Public License for more details.
*
* You should have received a copy of the Affero GNU General Public
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/

package io.pixelsdb.pixels.common.node;

import io.pixelsdb.pixels.daemon.NodeProto;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

public class TestNodeService
{
private static final Logger logger = LoggerFactory.getLogger(TestNodeService.class);
@Test
public void testGetRetinaList()
{
NodeService nodeService = NodeService.Instance();
List<NodeProto.NodeInfo> retinaList = nodeService.getRetinaList();
logger.debug("Retina List Size: {}", retinaList.size());
for(NodeProto.NodeInfo nodeInfo : retinaList)
{
logger.debug(nodeInfo.toString());
}
}

@Test
public void testGetRetinaByBucketId()
{
NodeService nodeService = NodeService.Instance();
NodeProto.NodeInfo retinaByBucket = nodeService.getRetinaByBucket(1);
logger.info("Retina By Bucket: {}", retinaByBucket);
}
}
Loading