Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions docs/network_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ Ether uses ZeroMQ (ZMQ) for network communication and Redis for state management

## Port Requirements
Ether requires the following TCP ports for communication. Default ports are:
- 5555: PubSub Frontend
- 5556: PubSub Backend
- 5559: ReqRep Frontend
- 5560: ReqRep Backend
- 6379: Redis
- 301309: Session Discovery
- 301310: Session Query
- 13311: PubSub Frontend
- 13312: PubSub Backend
- 13313: ReqRep Frontend
- 13314: ReqRep Backend
- 13315: Redis
- 13309: Session Discovery
- 13310: Session Query

> **Note:** All ports can be customized, but must match between server and clients.

Expand Down Expand Up @@ -44,13 +44,13 @@ To allow remote connections to an Ether server:
- Note the server's public IP address

2. **Router Configuration**
- Configure port forwarding for all required ports:
- Configure port forwarding for all required ports (defaults shown below):
```
TCP 5555 -> Server Local IP (PubSub Frontend)
TCP 5556 -> Server Local IP (PubSub Backend)
TCP 5559 -> Server Local IP (ReqRep Frontend)
TCP 5560 -> Server Local IP (ReqRep Backend)
TCP 6379 -> Server Local IP (Redis)
TCP 13311 -> Server Local IP (PubSub Frontend)
TCP 13312 -> Server Local IP (PubSub Backend)
TCP 13313 -> Server Local IP (ReqRep Frontend)
TCP 13314 -> Server Local IP (ReqRep Backend)
TCP 13315 -> Server Local IP (Redis)
```

3. **Server Configuration**
Expand All @@ -61,11 +61,11 @@ To allow remote connections to an Ether server:
# Configure server with public IP
network_config = EtherNetworkConfig(
host="your.public.ip", # Server's public IP
pubsub_frontend_port=5555,
pubsub_backend_port=5556,
reqrep_frontend_port=5559,
reqrep_backend_port=5560,
redis_port=6379
# pubsub_frontend_port=13311,
# pubsub_backend_port=13312,
# reqrep_frontend_port=13313,
# reqrep_backend_port=13314,
# redis_port=13315
)

config = EtherConfig(network=network_config)
Expand All @@ -86,10 +86,10 @@ To allow remote connections to an Ether server:
1. **Basic Port Test**
```bash
# On server
nc -l 5555
nc -l 13311

# On client
nc -v server.public.ip 5555
nc -v server.public.ip 13311
```

2. **Network Diagnostics**
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ webapp = [
[project.urls]
Homepage = "https://github.com/shababo/ether"
Issues = "https://github.com/shababo/ether/issues"

[dependency-groups]
dev = [
"pytest",
Expand Down
27 changes: 20 additions & 7 deletions src/ether/_internal/_config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from enum import Enum
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
import yaml
Expand Down Expand Up @@ -26,14 +27,26 @@ class EtherClassConfig(BaseModel):
class EtherNetworkConfig(BaseModel):
"""Network configuration for Ether"""
host: str = "localhost"
pubsub_frontend_port: int = 5555
pubsub_backend_port: int = 5556
reqrep_frontend_port: int = 5559
reqrep_backend_port: int = 5560
pubsub_frontend_port: int = 13311
pubsub_backend_port: int = 13312
reqrep_frontend_port: int = 13313
reqrep_backend_port: int = 13314
redis_host: str = "0.0.0.0" # Add separate Redis host config
redis_port: int = 13311
session_discovery_port: int = 31309
session_query_port: int = 31310
redis_port: int = 13315
session_discovery_port: int = 13309
session_query_port: int = 13310

class EtherSecurityLevel(Enum):
BASIC = "basic" # Just group isolation
STANDARD = "standard" # + role-based access
HIGH = "high" # + session key rotation, logging


class EtherSecurityConfig(BaseModel):
"""Security configuration for Ether"""
security_level: EtherSecurityLevel = EtherSecurityLevel.STANDARD
group_key: Optional[str] = None
user_key: Optional[str] = None



Expand Down
13 changes: 9 additions & 4 deletions src/ether/_internal/_ether.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _run_monitor(network_config: Optional[EtherNetworkConfig] = None):
logger.error(f"Error monitoring instances: {e}")
time.sleep(1)

def _run_reqrep_broker(frontend_port: int = 5559, backend_port: int = 5560):
def _run_reqrep_broker(frontend_port: int = 13313, backend_port: int = 13314):
"""Run the request-reply broker in a separate process"""
broker = EtherReqRepBroker(frontend_port=frontend_port, backend_port=backend_port)
try:
Expand Down Expand Up @@ -170,7 +170,7 @@ def start(self, ether_id: str, config = None, restart: bool = False, discovery =

session_metadata = None

self._logger.debug(f"Start called with ether_id={ether_id}, config={config}, restart={restart}")
self._logger.debug(f"Start called with ether_id={ether_id}, config={config}, restart={restart}, discovery={discovery}")
self._ether_id = ether_id

# Process configuration
Expand All @@ -188,6 +188,7 @@ def start(self, ether_id: str, config = None, restart: bool = False, discovery =
self._logger.error(f"Failed to process configuration: {e}", exc_info=True)
raise

# If we are running on the same machine as the session host, replace the public IP with the local IP
try:
public_ip = get_ip_address(use_public=True)
if config.network.host == public_ip:
Expand All @@ -203,10 +204,11 @@ def start(self, ether_id: str, config = None, restart: bool = False, discovery =

# Start session with network config
if discovery:
self._logger.debug("Starting session discovery process...")
try:
self._ether_session_process = Process(
target=session_discovery_launcher,
args=(self._ether_id, self._config.network)
kwargs={"ether_id": self._ether_id, "network_config": self._config.network}
)
self._ether_session_process.start()
time.sleep(1.0)
Expand Down Expand Up @@ -360,7 +362,10 @@ def _ensure_pubsub_running(self) -> bool:
def _ensure_reqrep_running(self) -> bool:
"""Ensure ReqRep broker is running"""
if self._reqrep_broker_process is None:
self._reqrep_broker_process = Process(target=_run_reqrep_broker)
self._reqrep_broker_process = Process(
target=_run_reqrep_broker,
args=(self._config.network.reqrep_frontend_port, self._config.network.reqrep_backend_port)
)
self._reqrep_broker_process.daemon = True
self._reqrep_broker_process.start()
return self._test_reqrep_connection()
Expand Down
12 changes: 6 additions & 6 deletions src/ether/_internal/_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,8 @@ def init_ether_vars(self, name=None, network_config: Optional[EtherNetworkConfig
self.results_file = None

# Register with instance tracker
self._instance_tracker = EtherInstanceLiaison(network_config=self.network_config)
self._instance_tracker.register_instance(f"{self.name}-{self.id}", self.get_metadata())
self._instance_liaison = EtherInstanceLiaison(network_config=self.network_config)
self._instance_liaison.register_instance(f"{self.name}-{self.id}", self.get_metadata())

# Add reqrep worker socket
self._worker_socket = None
Expand Down Expand Up @@ -567,8 +567,8 @@ def run(self):
try:
# Refresh TTL periodically
now = time.time()
if now - last_refresh >= (self._instance_tracker.ttl / 2):
self._instance_tracker.refresh_instance(self.id)
if now - last_refresh >= (self._instance_liaison.ttl / 2):
self._instance_liaison.refresh_instance(f"{self.name}-{self.id}")
last_refresh = now

# Create a poller to handle both sub and worker sockets
Expand Down Expand Up @@ -596,8 +596,8 @@ def run(self):

# Add cleanup
def cleanup(self):
if hasattr(self, '_instance_tracker'):
self._instance_tracker.deregister_instance(self.id)
if hasattr(self, '_instance_liaison'):
self._instance_liaison.deregister_instance(f"{self.name}-{self.id}")
if hasattr(self, '_sub_socket') and self._sub_socket:
self._sub_socket.close()
if hasattr(self, '_pub_socket') and self._pub_socket:
Expand Down
2 changes: 1 addition & 1 deletion src/ether/_internal/_reqrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class EtherReqRepBroker:
HEARTBEAT_INTERVAL = 2500 # msecs
HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable

def __init__(self, frontend_port: int = 5559, backend_port: int = 5560):
def __init__(self, frontend_port: int = 13313, backend_port: int = 13314):
self.id = str(uuid.uuid4())
self._logger = get_ether_logger("EtherReqRepBroker")
self._logger.debug("Initializing MDP broker")
Expand Down
Loading