From 5379505bd7707f63642c5e3b9fd097a167a37573 Mon Sep 17 00:00:00 2001 From: shababo Date: Sat, 25 Jan 2025 14:08:22 -0800 Subject: [PATCH 1/8] add ironhouse, curvezmq scripts from pyzmq and verify encryption --- src/scripts/security/generate_certificates.py | 61 +++++++++++ src/scripts/security/test_curve_security.py | 102 ++++++++++++++++++ 2 files changed, 163 insertions(+) create mode 100644 src/scripts/security/generate_certificates.py create mode 100644 src/scripts/security/test_curve_security.py diff --git a/src/scripts/security/generate_certificates.py b/src/scripts/security/generate_certificates.py new file mode 100644 index 0000000..f6b0618 --- /dev/null +++ b/src/scripts/security/generate_certificates.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python + +""" +Generate client and server CURVE certificate files then move them into the +appropriate store directory, private_keys or public_keys. The certificates +generated by this script are used by the stonehouse and ironhouse examples. + +In practice this would be done by hand or some out-of-band process. + +Author: Chris Laws +""" + +import os +import shutil +from typing import Union + +import zmq.auth + + +def generate_certificates(base_dir: Union[str, os.PathLike]) -> None: + '''Generate client and server CURVE certificate files''' + keys_dir = os.path.join(base_dir, 'certificates') + public_keys_dir = os.path.join(base_dir, 'public_keys') + secret_keys_dir = os.path.join(base_dir, 'private_keys') + + # Create directories for certificates, remove old content if necessary + for d in [keys_dir, public_keys_dir, secret_keys_dir]: + if os.path.exists(d): + shutil.rmtree(d) + os.mkdir(d) + + # create new keys in certificates dir + server_public_file, server_secret_file = zmq.auth.create_certificates( + keys_dir, "server" + ) + client_public_file, client_secret_file = zmq.auth.create_certificates( + keys_dir, "client" + ) + + # move public keys to appropriate directory + for key_file in os.listdir(keys_dir): + if key_file.endswith(".key"): + shutil.move( + os.path.join(keys_dir, key_file), os.path.join(public_keys_dir, '.') + ) + + # move secret keys to appropriate directory + for key_file in os.listdir(keys_dir): + if key_file.endswith(".key_secret"): + shutil.move( + os.path.join(keys_dir, key_file), os.path.join(secret_keys_dir, '.') + ) + + +if __name__ == '__main__': + if zmq.zmq_version_info() < (4, 0): + raise RuntimeError( + f"Security is not supported in libzmq version < 4.0. libzmq version {zmq.zmq_version()}" + ) + + generate_certificates(os.path.dirname(__file__)) \ No newline at end of file diff --git a/src/scripts/security/test_curve_security.py b/src/scripts/security/test_curve_security.py new file mode 100644 index 0000000..7636f10 --- /dev/null +++ b/src/scripts/security/test_curve_security.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python + +''' +Ironhouse extends Stonehouse with client public key authentication. + +This is the strongest security model we have today, protecting against every +attack we know about, except end-point attacks (where an attacker plants +spyware on a machine to capture data before it's encrypted, or after it's +decrypted). + +Author: Chris Laws +''' + +import logging +import os +import sys + +import zmq +import zmq.auth +from zmq.auth.thread import ThreadAuthenticator + + +def run() -> None: + '''Run Ironhouse example''' + + # These directories are generated by the generate_certificates script + base_dir = os.path.dirname(__file__) + keys_dir = os.path.join(base_dir, 'certificates') + public_keys_dir = os.path.join(base_dir, 'public_keys') + secret_keys_dir = os.path.join(base_dir, 'private_keys') + + if not ( + os.path.exists(keys_dir) + and os.path.exists(public_keys_dir) + and os.path.exists(secret_keys_dir) + ): + logging.critical( + "Certificates are missing - run generate_certificates.py script first" + ) + sys.exit(1) + + ctx = zmq.Context.instance() + + # Start an authenticator for this context. + auth = ThreadAuthenticator(ctx) + auth.start() + auth.allow('127.0.0.1') + # Tell authenticator to use the certificate in a directory + auth.configure_curve(domain='*', location=public_keys_dir) + + server = ctx.socket(zmq.PUSH) + + server_secret_file = os.path.join(secret_keys_dir, "server.key_secret") + server_public, server_secret = zmq.auth.load_certificate(server_secret_file) + server.curve_secretkey = server_secret + server.curve_publickey = server_public + server.curve_server = True # must come before bind + server.bind('tcp://*:9000') + + client = ctx.socket(zmq.PULL) + + # We need two certificates, one for the client and one for + # the server. The client must know the server's public key + # to make a CURVE connection. + client_secret_file = os.path.join(secret_keys_dir, "client.key_secret") + client_public, client_secret = zmq.auth.load_certificate(client_secret_file) + client.curve_secretkey = client_secret + client.curve_publickey = client_public + + server_public_file = os.path.join(public_keys_dir, "server.key") + server_public, _ = zmq.auth.load_certificate(server_public_file) + # The client must know the server's public key to make a CURVE connection. + client.curve_serverkey = server_public + client.connect('tcp://127.0.0.1:9000') + + server.send(b"Hello") + + if client.poll(1000): + msg = client.recv() + if msg == b"Hello": + logging.info("Ironhouse test OK") + else: + logging.error("Ironhouse test FAIL") + + # stop auth thread + auth.stop() + + +if __name__ == '__main__': + if zmq.zmq_version_info() < (4, 0): + raise RuntimeError( + f"Security is not supported in libzmq version < 4.0. libzmq version {zmq.zmq_version()}" + ) + + if '-v' in sys.argv: + level = logging.DEBUG + else: + level = logging.INFO + + logging.basicConfig(level=level, format="[%(levelname)s] %(message)s") + + run() \ No newline at end of file From 9af60a531095ca59a30074aeaaeeb27f7889b231 Mon Sep 17 00:00:00 2001 From: shababo Date: Mon, 27 Jan 2025 14:12:41 -0800 Subject: [PATCH 2/8] add _security module with some initial classes --- pyproject.toml | 2 +- src/ether/_internal/_security.py | 147 +++++++++++++++++++++++++++++++ 2 files changed, 148 insertions(+), 1 deletion(-) create mode 100644 src/ether/_internal/_security.py diff --git a/pyproject.toml b/pyproject.toml index d455c3d..92d6826 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,7 @@ webapp = [ [project.urls] Homepage = "https://github.com/shababo/ether" Issues = "https://github.com/shababo/ether/issues" - + [dependency-groups] dev = [ "pytest", diff --git a/src/ether/_internal/_security.py b/src/ether/_internal/_security.py new file mode 100644 index 0000000..8b7828b --- /dev/null +++ b/src/ether/_internal/_security.py @@ -0,0 +1,147 @@ +import os +import sys +import zmq +import json +from pathlib import Path +from datetime import datetime, timezone +from zmq.auth import create_certificates, load_certificate +from typing import Tuple, Optional + +class KeyManager: + def __init__(self, app_name: str = "myapp"): + self.app_name = app_name + self.keys_dir = self._get_platform_config_dir() / app_name / "keys" + self.metadata_file = self.keys_dir / "metadata.json" + self._ensure_dirs() + + def _get_platform_config_dir(self) -> Path: + """Get the appropriate config directory for the current platform""" + if sys.platform.startswith('win'): + # Windows: C:\Users\Username\AppData\Local + return Path(os.environ.get('LOCALAPPDATA')) + elif sys.platform.startswith('darwin'): + # macOS: ~/Library/Application Support + return Path.home() / "Library" / "Application Support" + else: + # Linux/Unix: ~/.config + return Path.home() / ".config" + + def _ensure_dirs(self) -> None: + """Create necessary directories if they don't exist""" + self.keys_dir.mkdir(parents=True, exist_ok=True) + + def _save_metadata(self, metadata: dict) -> None: + """Save key metadata""" + with open(self.metadata_file, 'w') as f: + json.dump(metadata, f) + + def _load_metadata(self) -> dict: + """Load key metadata""" + if self.metadata_file.exists(): + with open(self.metadata_file, 'r') as f: + return json.load(f) + return {} + + def _should_rotate_keys(self, metadata: dict) -> bool: + """Check if keys should be rotated based on age""" + if not metadata: + return True + + created_at = datetime.fromisoformat(metadata.get('created_at', '')) + now = datetime.now(timezone.utc) + # Rotate keys if they're older than 90 days + return (now - created_at).days > 90 + + def get_or_create_certificates(self) -> Tuple[bytes, bytes]: + """Get existing certificates or create new ones""" + metadata = self._load_metadata() + + # Check for key rotation + if self._should_rotate_keys(metadata): + return self._create_new_certificates() + + # Load existing certificates + try: + public_key, secret_key = load_certificate( + self.keys_dir / "server.key_secret" + ) + return public_key, secret_key + except Exception: + return self._create_new_certificates() + + def _create_new_certificates(self) -> Tuple[bytes, bytes]: + """Create new certificates and save metadata""" + # Archive old keys if they exist + self._archive_old_keys() + + # Create new keys + public_file, secret_file = create_certificates( + self.keys_dir, "server" + ) + + # Save metadata + metadata = { + 'created_at': datetime.now(timezone.utc).isoformat(), + 'public_key': str(public_file), + 'secret_key': str(secret_file) + } + self._save_metadata(metadata) + + # Load and return the new certificates + return load_certificate(self.keys_dir / "server.key_secret") + + def _archive_old_keys(self) -> None: + """Archive old keys with timestamp""" + if not self.metadata_file.exists(): + return + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + archive_dir = self.keys_dir / "archive" / timestamp + archive_dir.mkdir(parents=True, exist_ok=True) + + # Move old keys to archive + for key_file in self.keys_dir.glob("server.key*"): + if key_file.is_file(): + key_file.rename(archive_dir / key_file.name) + +class SecureMessaging: + def __init__(self, app_name: str = "myapp"): + self.context = zmq.Context() + self.key_manager = KeyManager(app_name) + self._public_key = None + self._secret_key = None + + @property + def keys(self) -> Tuple[bytes, bytes]: + """Lazy load keys""" + if not (self._public_key and self._secret_key): + self._public_key, self._secret_key = ( + self.key_manager.get_or_create_certificates() + ) + return self._public_key, self._secret_key + + def create_secure_socket(self, socket_type: int) -> zmq.Socket: + """Create a secure socket of the specified type""" + socket = self.context.socket(socket_type) + public_key, secret_key = self.keys + + socket.curve_secretkey = secret_key + socket.curve_publickey = public_key + socket.curve_server = True + + return socket + + def create_secure_client_socket( + self, + socket_type: int, + server_public_key: bytes + ) -> zmq.Socket: + """Create a secure client socket""" + socket = self.context.socket(socket_type) + public_key, secret_key = self.keys + + socket.curve_secretkey = secret_key + socket.curve_publickey = public_key + socket.curve_serverkey = server_public_key + + return socket \ No newline at end of file From 0fe053814d9cf410881ddbeaaf5d069d554eb936 Mon Sep 17 00:00:00 2001 From: shababo Date: Tue, 28 Jan 2025 16:34:53 -0800 Subject: [PATCH 3/8] update port numbers --- src/ether/_internal/_config.py | 27 +++- src/ether/_internal/_security.py | 214 ++++++++++++++++++++++++++++++- 2 files changed, 233 insertions(+), 8 deletions(-) diff --git a/src/ether/_internal/_config.py b/src/ether/_internal/_config.py index 9c166bc..1527f23 100644 --- a/src/ether/_internal/_config.py +++ b/src/ether/_internal/_config.py @@ -1,3 +1,4 @@ +from enum import Enum from typing import Any, Dict, Optional from pydantic import BaseModel, Field import yaml @@ -26,14 +27,26 @@ class EtherClassConfig(BaseModel): class EtherNetworkConfig(BaseModel): """Network configuration for Ether""" host: str = "localhost" - pubsub_frontend_port: int = 5555 - pubsub_backend_port: int = 5556 - reqrep_frontend_port: int = 5559 - reqrep_backend_port: int = 5560 + pubsub_frontend_port: int = 13311 + pubsub_backend_port: int = 13312 + reqrep_frontend_port: int = 13313 + reqrep_backend_port: int = 13314 redis_host: str = "0.0.0.0" # Add separate Redis host config - redis_port: int = 13311 - session_discovery_port: int = 31309 - session_query_port: int = 31310 + redis_port: int = 13315 + session_discovery_port: int = 13309 + session_query_port: int = 13310 + +class EtherSecurityLevel(Enum): + BASIC = "basic" # Just group isolation + STANDARD = "standard" # + role-based access + HIGH = "high" # + session key rotation, logging + + +class EtherSecurityConfig(BaseModel): + """Security configuration for Ether""" + security_level: EtherSecurityLevel = EtherSecurityLevel.STANDARD + group_key: Optional[str] = None + user_key: Optional[str] = None diff --git a/src/ether/_internal/_security.py b/src/ether/_internal/_security.py index 8b7828b..1991674 100644 --- a/src/ether/_internal/_security.py +++ b/src/ether/_internal/_security.py @@ -104,6 +104,104 @@ def _archive_old_keys(self) -> None: if key_file.is_file(): key_file.rename(archive_dir / key_file.name) +class SessionManager: + def start_key_rotation(self, socket, peer_id): + """Periodically rotate session keys while maintaining connection""" + def rotate_keys(): + while True: + new_keys = self.generate_session_keys() + self.signal_key_rotation(socket, peer_id, new_keys) + time.sleep(KEY_ROTATION_INTERVAL) + + threading.Thread(target=rotate_keys, daemon=True).start() + + + +class SessionManager4: + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + if not hasattr(self, 'initialized'): + self.group_key = None + self.security_level = SecurityLevel.BASIC + self.roles = {} + self.is_host = False + self.session_keys = {} # For HIGH security with key rotation + self.initialized = True + + def set_group_key(self, key: str): + self.group_key = key + # Derive base keys for all ZMQ connections + self.base_public_key, self.base_secret_key = derive_keys(key) + +class SessionManager3: + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + if not hasattr(self, 'initialized'): + self.session_key = None + self.host_key = None + self.security_level = SecurityLevel.STANDARD + self.initialized = True + + def create_session(self, security_level: SecurityLevel = SecurityLevel.STANDARD) -> str: + """Host creates a new session""" + self.session_key = generate_session_key() # Strong random key + self.host_key = generate_host_key() # Host's admin key + self.security_level = security_level + return self.session_key + + def join_session(self, session_key: str): + """Clients join existing session""" + self.session_key = session_key + +class SecureMessaging3: + def __init__(self): + self.context = zmq.Context() + self.session = SessionManager() + + def create_secure_socket(self, socket_type: int) -> zmq.Socket: + socket = self.context.socket(socket_type) + + # Use session key for encryption + if self.session.session_key: + public_key, secret_key = derive_keys(self.session.session_key) + socket.curve_secretkey = secret_key + socket.curve_publickey = public_key + socket.curve_server = True + + return socket + +class SecureMessaging2: + def __init__(self): + self.context = zmq.Context() + self.key_manager = KeyManager() + self.session_manager = SessionManager() + + def create_secure_socket(self, socket_type: int, peer_id: str = None): + socket = self.context.socket(socket_type) + + # Base security with CurveZMQ + public_key, secret_key = self.key_manager.keys + socket.curve_secretkey = secret_key + socket.curve_publickey = public_key + + # Add session key rotation for long-term connections + if peer_id: + self.session_manager.start_key_rotation(socket, peer_id) + + return socket + class SecureMessaging: def __init__(self, app_name: str = "myapp"): self.context = zmq.Context() @@ -144,4 +242,118 @@ def create_secure_client_socket( socket.curve_publickey = public_key socket.curve_serverkey = server_public_key - return socket \ No newline at end of file + return socket + +# Access Control and Method Security +class ServiceDecorator: + def __call__(self, func): + @wraps(func) + def wrapper(*args, **kwargs): + # Verify caller has permission to execute this method + if not self.verify_caller_permissions(func.__name__): + raise SecurityError("Unauthorized method call") + + # Validate input parameters + if not self.validate_parameters(func, args, kwargs): + raise SecurityError("Invalid parameters") + + # Log access for audit + self.log_method_access(func.__name__) + + return func(*args, **kwargs) + return wrapper + + def verify_caller_permissions(self, method_name): + """Check if caller is allowed to execute this method""" + # Implement role-based access control + return True + +# Equipment/Resource Protection +class ResourceManager: + def __init__(self): + self.protected_resources = set() + self.resource_locks = {} + + def protect_resource(self, resource_name, allowed_operations): + """Register protected lab equipment/resources""" + self.protected_resources.add({ + 'name': resource_name, + 'allowed_ops': allowed_operations, + 'lock': threading.Lock() + }) + + def validate_operation(self, resource, operation): + """Ensure operation is safe and allowed""" + if resource not in self.protected_resources: + return False + return operation in resource['allowed_ops'] + +# Message Privacy and Integrity +class MessageHandler: + def __init__(self): + self.security_level = SecurityLevel.STANDARD + + def send_message(self, socket, message, security_level=None): + """Send message with appropriate security""" + level = security_level or self.security_level + + if level == SecurityLevel.HIGH: + # Use additional encryption for sensitive data + message = self.encrypt_sensitive_data(message) + + # Add message integrity check + message['hmac'] = self.generate_hmac(message) + + socket.send_json(message) + +# System Hardening +class SystemSecurity: + def harden_environment(self): + """Implement system-level security measures""" + # Restrict file system access + self.set_working_directory() + + # Monitor system resources + self.start_resource_monitor() + + # Set up network isolation + self.configure_network_restrictions() + + def set_working_directory(self): + """Restrict file system access to specific directories""" + os.chdir(self.safe_working_dir) + + def start_resource_monitor(self): + """Monitor CPU, memory, disk usage for anomalies""" + ResourceMonitor().start() + + +""" + +Example usage: + +# Decorating a lab equipment method +@service(security_level=SecurityLevel.HIGH) +@requires_permission('equipment_control') +def control_microscope(self, parameters): + if not resource_manager.validate_operation('microscope', parameters['operation']): + raise SecurityError("Operation not allowed") + + # Proceed with microscope control + return self._execute_microscope_command(parameters) + +# Long-running data acquisition +@service(security_level=SecurityLevel.STANDARD) +@requires_permission('data_acquisition') +def acquire_data(self, duration_hours): + session = SessionManager().create_session() + + try: + while session.running: + data = self._collect_data_point() + self.send_secure_message(data, session) + time.sleep(collection_interval) + finally: + session.close() + +""" From f457215ee962838d2c8b9179036f11a89aa8cf96 Mon Sep 17 00:00:00 2001 From: shababo Date: Tue, 28 Jan 2025 16:45:10 -0800 Subject: [PATCH 4/8] update port values to new ether defaults --- docs/network_configuration.md | 40 +++++++++---------- src/ether/utils.py | 4 +- src/scripts/network_setup/check_server.py | 10 ++--- .../network_setup/diagnose_connection.py | 2 +- src/scripts/network_setup/open_ports_linux.sh | 24 +++++------ src/scripts/network_setup/open_ports_macos.sh | 10 ++--- src/tests/test_ether_network.py | 20 +++++----- 7 files changed, 55 insertions(+), 55 deletions(-) diff --git a/docs/network_configuration.md b/docs/network_configuration.md index e2bbb7c..b141847 100644 --- a/docs/network_configuration.md +++ b/docs/network_configuration.md @@ -5,13 +5,13 @@ Ether uses ZeroMQ (ZMQ) for network communication and Redis for state management ## Port Requirements Ether requires the following TCP ports for communication. Default ports are: -- 5555: PubSub Frontend -- 5556: PubSub Backend -- 5559: ReqRep Frontend -- 5560: ReqRep Backend -- 6379: Redis -- 301309: Session Discovery -- 301310: Session Query +- 13311: PubSub Frontend +- 13312: PubSub Backend +- 13313: ReqRep Frontend +- 13314: ReqRep Backend +- 13315: Redis +- 13309: Session Discovery +- 13310: Session Query > **Note:** All ports can be customized, but must match between server and clients. @@ -44,13 +44,13 @@ To allow remote connections to an Ether server: - Note the server's public IP address 2. **Router Configuration** - - Configure port forwarding for all required ports: + - Configure port forwarding for all required ports (defaults shown below): ``` - TCP 5555 -> Server Local IP (PubSub Frontend) - TCP 5556 -> Server Local IP (PubSub Backend) - TCP 5559 -> Server Local IP (ReqRep Frontend) - TCP 5560 -> Server Local IP (ReqRep Backend) - TCP 6379 -> Server Local IP (Redis) + TCP 13311 -> Server Local IP (PubSub Frontend) + TCP 13312 -> Server Local IP (PubSub Backend) + TCP 13313 -> Server Local IP (ReqRep Frontend) + TCP 13314 -> Server Local IP (ReqRep Backend) + TCP 13315 -> Server Local IP (Redis) ``` 3. **Server Configuration** @@ -61,11 +61,11 @@ To allow remote connections to an Ether server: # Configure server with public IP network_config = EtherNetworkConfig( host="your.public.ip", # Server's public IP - pubsub_frontend_port=5555, - pubsub_backend_port=5556, - reqrep_frontend_port=5559, - reqrep_backend_port=5560, - redis_port=6379 + # pubsub_frontend_port=5555, + # pubsub_backend_port=5556, + # reqrep_frontend_port=5559, + # reqrep_backend_port=5560, + # redis_port=6379 ) config = EtherConfig(network=network_config) @@ -86,10 +86,10 @@ To allow remote connections to an Ether server: 1. **Basic Port Test** ```bash # On server - nc -l 5555 + nc -l 13311 # On client - nc -v server.public.ip 5555 + nc -v server.public.ip 13311 ``` 2. **Network Diagnostics** diff --git a/src/ether/utils.py b/src/ether/utils.py index c9028c5..0807c0b 100644 --- a/src/ether/utils.py +++ b/src/ether/utils.py @@ -4,8 +4,8 @@ import socket import requests # Standard ports for Ether communication -_ETHER_SUB_PORT = 5555 # subscribe to this port -_ETHER_PUB_PORT = 5556 # publish to this port +_ETHER_SUB_PORT = 13311 # subscribe to this port +_ETHER_PUB_PORT = 13312 # publish to this port # Log directory structure _LOG_DIR = Path("logs") diff --git a/src/scripts/network_setup/check_server.py b/src/scripts/network_setup/check_server.py index 38b4789..203e015 100644 --- a/src/scripts/network_setup/check_server.py +++ b/src/scripts/network_setup/check_server.py @@ -32,11 +32,11 @@ def main(): host = sys.argv[1] ports = { - 5555: "PubSub Frontend", - 5556: "PubSub Backend", - 5559: "ReqRep Frontend", - 5560: "ReqRep Backend", - 6379: "Redis" + 13311: "PubSub Frontend", + 13312: "PubSub Backend", + 13313: "ReqRep Frontend", + 13314: "ReqRep Backend", + 13315: "Redis" } print(f"\nChecking Ether ports on {host}...") diff --git a/src/scripts/network_setup/diagnose_connection.py b/src/scripts/network_setup/diagnose_connection.py index 2436c62..eb2cb07 100644 --- a/src/scripts/network_setup/diagnose_connection.py +++ b/src/scripts/network_setup/diagnose_connection.py @@ -29,7 +29,7 @@ def get_network_info(): def test_local_ports(host): """Test if ports are in use locally""" - ports = [5555, 5556, 5559, 5560, 6379] + ports = [13311, 13312, 13313, 13314, 13315] results = {} for port in ports: diff --git a/src/scripts/network_setup/open_ports_linux.sh b/src/scripts/network_setup/open_ports_linux.sh index 68e2574..01ccb8e 100755 --- a/src/scripts/network_setup/open_ports_linux.sh +++ b/src/scripts/network_setup/open_ports_linux.sh @@ -11,27 +11,27 @@ if command -v ufw >/dev/null 2>&1; then echo "Using UFW firewall..." # Allow ZMQ and Redis ports - ufw allow 5555/tcp # PubSub frontend - ufw allow 5556/tcp # PubSub backend - ufw allow 5559/tcp # ReqRep frontend - ufw allow 5560/tcp # ReqRep backend - ufw allow 6379/tcp # Redis + ufw allow 13311/tcp # PubSub frontend + ufw allow 13312/tcp # PubSub backend + ufw allow 13313/tcp # ReqRep frontend + ufw allow 13314/tcp # ReqRep backend + ufw allow 13315/tcp # Redis echo "UFW rules added. Current status:" - ufw status | grep -E '5555|5556|5559|5560|6379' + ufw status | grep -E '13311|13312|13313|13314|13315' elif command -v iptables >/dev/null 2>&1; then echo "Using iptables..." # Allow ZMQ and Redis ports - iptables -A INPUT -p tcp --dport 5555 -j ACCEPT - iptables -A INPUT -p tcp --dport 5556 -j ACCEPT - iptables -A INPUT -p tcp --dport 5559 -j ACCEPT - iptables -A INPUT -p tcp --dport 5560 -j ACCEPT - iptables -A INPUT -p tcp --dport 6379 -j ACCEPT + iptables -A INPUT -p tcp --dport 13311 -j ACCEPT + iptables -A INPUT -p tcp --dport 13312 -j ACCEPT + iptables -A INPUT -p tcp --dport 13313 -j ACCEPT + iptables -A INPUT -p tcp --dport 13314 -j ACCEPT + iptables -A INPUT -p tcp --dport 13315 -j ACCEPT echo "IPTables rules added. Current rules:" - iptables -L | grep -E '5555|5556|5559|5560|6379' + iptables -L | grep -E '13311|13312|13313|13314|13315' else echo "No supported firewall found" exit 1 diff --git a/src/scripts/network_setup/open_ports_macos.sh b/src/scripts/network_setup/open_ports_macos.sh index abed7c3..b1ae013 100755 --- a/src/scripts/network_setup/open_ports_macos.sh +++ b/src/scripts/network_setup/open_ports_macos.sh @@ -20,11 +20,11 @@ echo "Configuring firewall for Python at: $PYTHON_PATH" /usr/libexec/ApplicationFirewall/socketfilter --unblock "$PYTHON_PATH" echo "Firewall configured for Python. The following ports should now be accessible:" -echo "- 5555 (PubSub frontend)" -echo "- 5556 (PubSub backend)" -echo "- 5559 (ReqRep frontend)" -echo "- 5560 (ReqRep backend)" -echo "- 6379 (Redis)" +echo "- 13311 (PubSub frontend)" +echo "- 13312 (PubSub backend)" +echo "- 13313 (ReqRep frontend)" +echo "- 13314 (ReqRep backend)" +echo "- 13315 (Redis)" # Restart firewall to apply changes pfctl -F all -f /etc/pf.conf 2>/dev/null diff --git a/src/tests/test_ether_network.py b/src/tests/test_ether_network.py index 194cd79..6d3d157 100644 --- a/src/tests/test_ether_network.py +++ b/src/tests/test_ether_network.py @@ -48,20 +48,20 @@ def test_ether_network_communication(): # Create network configs for "server" and "client" server_network = EtherNetworkConfig( host=local_ip, # Use actual IP for server - pubsub_frontend_port=5555, - pubsub_backend_port=5556, - reqrep_frontend_port=5559, - reqrep_backend_port=5560, - redis_port=6379 + # pubsub_frontend_port=5555, + # pubsub_backend_port=5556, + # reqrep_frontend_port=5559, + # reqrep_backend_port=5560, + # redis_port=6379 ) client_network = EtherNetworkConfig( host=local_ip, # Connect to server IP - pubsub_frontend_port=5555, - pubsub_backend_port=5556, - reqrep_frontend_port=5559, - reqrep_backend_port=5560, - redis_port=6379 + # pubsub_frontend_port=5555, + # pubsub_backend_port=5556, + # reqrep_frontend_port=5559, + # reqrep_backend_port=5560, + # redis_port=6379 ) # Create server config From fc234bd7145fa186a5a5f188e4546a052bab5bca Mon Sep 17 00:00:00 2001 From: shababo Date: Tue, 28 Jan 2025 17:05:50 -0800 Subject: [PATCH 5/8] continue updating port values throughout codebase --- docs/network_configuration.md | 10 +++++----- src/ether/_internal/_ether.py | 7 +++++-- src/ether/_internal/_reqrep.py | 2 +- .../network_setup/diagnose_connection.py | 4 ++-- .../network_setup/open_ports_windows.ps1 | 10 +++++----- src/tests/test_ether_multiservice.py | 4 ++-- src/tests/test_ether_network.py | 20 +++++++++---------- src/tests/test_ether_reqrep.py | 4 ++-- src/tests/test_external_integration.py | 2 +- src/tests/test_reqrep.py | 6 +++--- 10 files changed, 36 insertions(+), 33 deletions(-) diff --git a/docs/network_configuration.md b/docs/network_configuration.md index b141847..00a6ac6 100644 --- a/docs/network_configuration.md +++ b/docs/network_configuration.md @@ -61,11 +61,11 @@ To allow remote connections to an Ether server: # Configure server with public IP network_config = EtherNetworkConfig( host="your.public.ip", # Server's public IP - # pubsub_frontend_port=5555, - # pubsub_backend_port=5556, - # reqrep_frontend_port=5559, - # reqrep_backend_port=5560, - # redis_port=6379 + # pubsub_frontend_port=13311, + # pubsub_backend_port=13312, + # reqrep_frontend_port=13313, + # reqrep_backend_port=13314, + # redis_port=13315 ) config = EtherConfig(network=network_config) diff --git a/src/ether/_internal/_ether.py b/src/ether/_internal/_ether.py index 151d932..1813329 100644 --- a/src/ether/_internal/_ether.py +++ b/src/ether/_internal/_ether.py @@ -82,7 +82,7 @@ def _run_monitor(network_config: Optional[EtherNetworkConfig] = None): logger.error(f"Error monitoring instances: {e}") time.sleep(1) -def _run_reqrep_broker(frontend_port: int = 5559, backend_port: int = 5560): +def _run_reqrep_broker(frontend_port: int = 13313, backend_port: int = 13314): """Run the request-reply broker in a separate process""" broker = EtherReqRepBroker(frontend_port=frontend_port, backend_port=backend_port) try: @@ -360,7 +360,10 @@ def _ensure_pubsub_running(self) -> bool: def _ensure_reqrep_running(self) -> bool: """Ensure ReqRep broker is running""" if self._reqrep_broker_process is None: - self._reqrep_broker_process = Process(target=_run_reqrep_broker) + self._reqrep_broker_process = Process( + target=_run_reqrep_broker, + args=(self._config.network.reqrep_frontend_port, self._config.network.reqrep_backend_port) + ) self._reqrep_broker_process.daemon = True self._reqrep_broker_process.start() return self._test_reqrep_connection() diff --git a/src/ether/_internal/_reqrep.py b/src/ether/_internal/_reqrep.py index f3b5732..0a3a60c 100644 --- a/src/ether/_internal/_reqrep.py +++ b/src/ether/_internal/_reqrep.py @@ -50,7 +50,7 @@ class EtherReqRepBroker: HEARTBEAT_INTERVAL = 2500 # msecs HEARTBEAT_LIVENESS = 3 # 3-5 is reasonable - def __init__(self, frontend_port: int = 5559, backend_port: int = 5560): + def __init__(self, frontend_port: int = 13313, backend_port: int = 13314): self.id = str(uuid.uuid4()) self._logger = get_ether_logger("EtherReqRepBroker") self._logger.debug("Initializing MDP broker") diff --git a/src/scripts/network_setup/diagnose_connection.py b/src/scripts/network_setup/diagnose_connection.py index eb2cb07..691a684 100644 --- a/src/scripts/network_setup/diagnose_connection.py +++ b/src/scripts/network_setup/diagnose_connection.py @@ -78,7 +78,7 @@ def main(): print(f"Port {port}: {status}") print("\n3. Connection Test to Target:") - ports = [5555, 5556, 5559, 5560, 6379] + ports = [13311, 13312, 13313, 13314, 13315] for port in ports: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(2) @@ -109,7 +109,7 @@ def main(): print("\n7. Active Network Connections:") if platform.system() in ["Linux", "Darwin"]: - print(run_command("netstat -an | grep -E '5555|5556|5559|5560|6379'")) + print(run_command("netstat -an | grep -E '13311|13312|13313|13314|13315'")) if __name__ == "__main__": main() \ No newline at end of file diff --git a/src/scripts/network_setup/open_ports_windows.ps1 b/src/scripts/network_setup/open_ports_windows.ps1 index 6a53059..26bcf8c 100644 --- a/src/scripts/network_setup/open_ports_windows.ps1 +++ b/src/scripts/network_setup/open_ports_windows.ps1 @@ -6,11 +6,11 @@ if (-NOT ([Security.Principal.WindowsPrincipal][Security.Principal.WindowsIdenti # Array of port configurations $ports = @( - @{Port = 5555; Name = "Ether PubSub Frontend"}, - @{Port = 5556; Name = "Ether PubSub Backend"}, - @{Port = 5559; Name = "Ether ReqRep Frontend"}, - @{Port = 5560; Name = "Ether ReqRep Backend"}, - @{Port = 6379; Name = "Ether Redis"} + @{Port = 13311; Name = "Ether PubSub Frontend"}, + @{Port = 13312; Name = "Ether PubSub Backend"}, + @{Port = 13313; Name = "Ether ReqRep Frontend"}, + @{Port = 13314; Name = "Ether ReqRep Backend"}, + @{Port = 13315; Name = "Ether Redis"} ) # Add firewall rules for each port diff --git a/src/tests/test_ether_multiservice.py b/src/tests/test_ether_multiservice.py index 82941d8..69623a3 100644 --- a/src/tests/test_ether_multiservice.py +++ b/src/tests/test_ether_multiservice.py @@ -28,7 +28,7 @@ def run_worker(service_name): socket = context.socket(zmq.DEALER) socket.linger = 0 socket.setsockopt(zmq.RCVTIMEO, 2500) - socket.connect("tcp://localhost:5560") + socket.connect("tcp://localhost:13314") try: # Register with broker @@ -92,7 +92,7 @@ def run_client(client_id, services): context = zmq.Context() socket = context.socket(zmq.DEALER) socket.setsockopt(zmq.RCVTIMEO, 2500) - socket.connect("tcp://localhost:5559") + socket.connect("tcp://localhost:13313") try: replies_received = {service: set() for service in services} diff --git a/src/tests/test_ether_network.py b/src/tests/test_ether_network.py index 6d3d157..6d1cf19 100644 --- a/src/tests/test_ether_network.py +++ b/src/tests/test_ether_network.py @@ -48,20 +48,20 @@ def test_ether_network_communication(): # Create network configs for "server" and "client" server_network = EtherNetworkConfig( host=local_ip, # Use actual IP for server - # pubsub_frontend_port=5555, - # pubsub_backend_port=5556, - # reqrep_frontend_port=5559, - # reqrep_backend_port=5560, - # redis_port=6379 + # pubsub_frontend_port=13311, + # pubsub_backend_port=13312, + # reqrep_frontend_port=13313, + # reqrep_backend_port=13314, + # redis_port=13315 ) client_network = EtherNetworkConfig( host=local_ip, # Connect to server IP - # pubsub_frontend_port=5555, - # pubsub_backend_port=5556, - # reqrep_frontend_port=5559, - # reqrep_backend_port=5560, - # redis_port=6379 + # pubsub_frontend_port=13311, + # pubsub_backend_port=13312, + # reqrep_frontend_port=13313, + # reqrep_backend_port=13314, + # redis_port=13315 ) # Create server config diff --git a/src/tests/test_ether_reqrep.py b/src/tests/test_ether_reqrep.py index 97399e3..a5a1d4d 100644 --- a/src/tests/test_ether_reqrep.py +++ b/src/tests/test_ether_reqrep.py @@ -26,7 +26,7 @@ def run_worker(service_name): context = zmq.Context() socket = context.socket(zmq.DEALER) socket.setsockopt(zmq.RCVTIMEO, 2500) - socket.connect("tcp://localhost:5560") + socket.connect("tcp://localhost:13314") try: # Register with broker @@ -83,7 +83,7 @@ def run_client(service_name, client_id): context = zmq.Context() socket = context.socket(zmq.DEALER) socket.setsockopt(zmq.RCVTIMEO, 2500) - socket.connect("tcp://localhost:5559") + socket.connect("tcp://localhost:13313") try: replies_received = set() diff --git a/src/tests/test_external_integration.py b/src/tests/test_external_integration.py index 60bb658..7921c81 100644 --- a/src/tests/test_external_integration.py +++ b/src/tests/test_external_integration.py @@ -13,7 +13,7 @@ def test_external_integration(): # Setup ZMQ subscriber to listen for messages context = zmq.Context() socket = context.socket(zmq.SUB) - socket.connect("tcp://localhost:5555") + socket.connect("tcp://localhost:13311") socket.subscribe("test_processed") # Subscribe to processor output topic config_dict = { diff --git a/src/tests/test_reqrep.py b/src/tests/test_reqrep.py index 8d66ea9..7092db3 100644 --- a/src/tests/test_reqrep.py +++ b/src/tests/test_reqrep.py @@ -15,7 +15,7 @@ def run_broker(): """Run the broker in a separate process""" - broker = EtherReqRepBroker(frontend_port=5559, backend_port=5560) + broker = EtherReqRepBroker(frontend_port=13313, backend_port=13314) try: broker.run() except KeyboardInterrupt: @@ -29,7 +29,7 @@ def run_worker(service_name): context = zmq.Context() socket = context.socket(zmq.DEALER) socket.setsockopt(zmq.RCVTIMEO, 2500) - socket.connect("tcp://localhost:5560") + socket.connect("tcp://localhost:13314") try: # Register with broker @@ -86,7 +86,7 @@ def run_client(service_name, client_id): context = zmq.Context() socket = context.socket(zmq.DEALER) socket.setsockopt(zmq.RCVTIMEO, 2500) - socket.connect("tcp://localhost:5559") + socket.connect("tcp://localhost:13313") try: replies_received = set() From 75566fc0e65ed0c019054b60674ff29ab558b210 Mon Sep 17 00:00:00 2001 From: shababo Date: Wed, 29 Jan 2025 16:03:47 -0800 Subject: [PATCH 6/8] fix instance ttl key name bug --- src/ether/_internal/_registry.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/ether/_internal/_registry.py b/src/ether/_internal/_registry.py index 1109136..cd08812 100644 --- a/src/ether/_internal/_registry.py +++ b/src/ether/_internal/_registry.py @@ -200,8 +200,8 @@ def init_ether_vars(self, name=None, network_config: Optional[EtherNetworkConfig self.results_file = None # Register with instance tracker - self._instance_tracker = EtherInstanceLiaison(network_config=self.network_config) - self._instance_tracker.register_instance(f"{self.name}-{self.id}", self.get_metadata()) + self._instance_liaison = EtherInstanceLiaison(network_config=self.network_config) + self._instance_liaison.register_instance(f"{self.name}-{self.id}", self.get_metadata()) # Add reqrep worker socket self._worker_socket = None @@ -567,8 +567,8 @@ def run(self): try: # Refresh TTL periodically now = time.time() - if now - last_refresh >= (self._instance_tracker.ttl / 2): - self._instance_tracker.refresh_instance(self.id) + if now - last_refresh >= (self._instance_liaison.ttl / 2): + self._instance_liaison.refresh_instance(f"{self.name}-{self.id}") last_refresh = now # Create a poller to handle both sub and worker sockets @@ -596,8 +596,8 @@ def run(self): # Add cleanup def cleanup(self): - if hasattr(self, '_instance_tracker'): - self._instance_tracker.deregister_instance(self.id) + if hasattr(self, '_instance_liaison'): + self._instance_liaison.deregister_instance(f"{self.name}-{self.id}") if hasattr(self, '_sub_socket') and self._sub_socket: self._sub_socket.close() if hasattr(self, '_pub_socket') and self._pub_socket: From b404e7b6a0e53246db21911b740c33abf6e33930 Mon Sep 17 00:00:00 2001 From: shababo Date: Thu, 30 Jan 2025 23:19:31 -0800 Subject: [PATCH 7/8] add network config to discovery launcher --- src/ether/_internal/_ether.py | 4 +++- src/ether/_internal/_session.py | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/ether/_internal/_ether.py b/src/ether/_internal/_ether.py index 1813329..1253922 100644 --- a/src/ether/_internal/_ether.py +++ b/src/ether/_internal/_ether.py @@ -170,7 +170,7 @@ def start(self, ether_id: str, config = None, restart: bool = False, discovery = session_metadata = None - self._logger.debug(f"Start called with ether_id={ether_id}, config={config}, restart={restart}") + self._logger.debug(f"Start called with ether_id={ether_id}, config={config}, restart={restart}, discovery={discovery}") self._ether_id = ether_id # Process configuration @@ -188,6 +188,7 @@ def start(self, ether_id: str, config = None, restart: bool = False, discovery = self._logger.error(f"Failed to process configuration: {e}", exc_info=True) raise + # If we are running on the same machine as the session host, replace the public IP with the local IP try: public_ip = get_ip_address(use_public=True) if config.network.host == public_ip: @@ -203,6 +204,7 @@ def start(self, ether_id: str, config = None, restart: bool = False, discovery = # Start session with network config if discovery: + self._logger.debug("Starting session discovery process...") try: self._ether_session_process = Process( target=session_discovery_launcher, diff --git a/src/ether/_internal/_session.py b/src/ether/_internal/_session.py index 469c2ef..610f3b7 100644 --- a/src/ether/_internal/_session.py +++ b/src/ether/_internal/_session.py @@ -214,7 +214,7 @@ def cleanup(self): if hasattr(self, 'context'): self.context.term() -def session_discovery_launcher(process_id: str, network_config: Optional[EtherNetworkConfig] = None): +def session_discovery_launcher(ether_id: str, network_config: Optional[EtherNetworkConfig] = None, retries: int = 5): """Launch an Ether session process Args: @@ -224,13 +224,13 @@ def session_discovery_launcher(process_id: str, network_config: Optional[EtherNe try: # First try to find existing session # print(f"Process {process_id}: Checking for existing session...") - current_session = EtherSession.get_current_session(timeout=200) + current_session = EtherSession.get_current_session(timeout=200, network_config=network_config) if current_session is None: # print(f"Process {process_id}: No session found, attempting to create new one...") try: session_mgr = EtherSession( - ether_id=process_id, + ether_id=ether_id, network_config=network_config ) while True: From d3cc432a3dccff8fd0964db7538f5dfea2c1e8bd Mon Sep 17 00:00:00 2001 From: shababo Date: Thu, 30 Jan 2025 23:24:44 -0800 Subject: [PATCH 8/8] retry session discovery in initial query --- src/ether/_internal/_ether.py | 2 +- src/ether/_internal/_session.py | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/ether/_internal/_ether.py b/src/ether/_internal/_ether.py index 1253922..ce41099 100644 --- a/src/ether/_internal/_ether.py +++ b/src/ether/_internal/_ether.py @@ -208,7 +208,7 @@ def start(self, ether_id: str, config = None, restart: bool = False, discovery = try: self._ether_session_process = Process( target=session_discovery_launcher, - args=(self._ether_id, self._config.network) + kwargs={"ether_id": self._ether_id, "network_config": self._config.network} ) self._ether_session_process.start() time.sleep(1.0) diff --git a/src/ether/_internal/_session.py b/src/ether/_internal/_session.py index 610f3b7..70c06e2 100644 --- a/src/ether/_internal/_session.py +++ b/src/ether/_internal/_session.py @@ -224,7 +224,12 @@ def session_discovery_launcher(ether_id: str, network_config: Optional[EtherNetw try: # First try to find existing session # print(f"Process {process_id}: Checking for existing session...") - current_session = EtherSession.get_current_session(timeout=200, network_config=network_config) + while retries > 0: + current_session = EtherSession.get_current_session(timeout=200, network_config=network_config) + if current_session is not None: + break + retries -= 1 + time.sleep(0.5) if current_session is None: # print(f"Process {process_id}: No session found, attempting to create new one...")