diff --git a/igvm/__init__.py b/igvm/__init__.py index d3c653cc..53d57226 100644 --- a/igvm/__init__.py +++ b/igvm/__init__.py @@ -3,4 +3,4 @@ Copyright (c) 2024 InnoGames GmbH """ -VERSION = (2, 2, 5) +VERSION = (2, 3, 0) diff --git a/igvm/cli.py b/igvm/cli.py index 6f686236..e1698893 100644 --- a/igvm/cli.py +++ b/igvm/cli.py @@ -8,7 +8,7 @@ from logging import StreamHandler, root as root_logger import time -from fabric.network import disconnect_all +from igvm.host import disconnect_all from igvm.commands import ( change_address, @@ -60,7 +60,7 @@ def format_help(self): out.append(ColorFormatters.BOLD.format(choice)) if subparser.get_default('func').__doc__: out.append('\n'.join( - '\t{}'.format(l.strip()) for l in subparser + f'\t{line.strip()}' for line in subparser .get_default('func').__doc__.strip().splitlines() )) out.append('\n\t{}'.format(subparser.format_usage())) @@ -495,9 +495,8 @@ def main(): try: args.pop('func')(**args) finally: - # Fabric requires the disconnect function to be called after every - # use. We are also taking our chance to disconnect from - # the hypervisors. + # Disconnect all SSH connections. We are also taking our chance + # to disconnect from the hypervisors. disconnect_all() close_virtconns() diff --git a/igvm/colors.py b/igvm/colors.py new file mode 100644 index 00000000..8240f175 --- /dev/null +++ b/igvm/colors.py @@ -0,0 +1,28 @@ +"""igvm - Color output helpers + +Copyright (c) 2026 InnoGames GmbH + +Replacement for fabric.colors which is not available in fabric 3.x. +""" + + +def _wrap(code: int, text: str, bold: bool = False) -> str: + if bold: + return f'\033[1;{code}m{text}\033[0m' + return f'\033[{code}m{text}\033[0m' + + +def green(text: str, bold: bool = False) -> str: + return _wrap(32, text, bold) + + +def red(text: str, bold: bool = False) -> str: + return _wrap(31, text, bold) + + +def yellow(text: str, bold: bool = False) -> str: + return _wrap(33, text, bold) + + +def white(text: str, bold: bool = False) -> str: + return _wrap(37, text, bold) diff --git a/igvm/commands.py b/igvm/commands.py index d1999710..068fe4bf 100644 --- a/igvm/commands.py +++ b/igvm/commands.py @@ -15,8 +15,8 @@ from adminapi import parse from adminapi.dataset import Query, DatasetError from adminapi.filters import Any, BaseFilter, StartsWith, Contains -from fabric.colors import green, red, white, yellow -from fabric.network import disconnect_all +from igvm.colors import green, red, white, yellow +from igvm.host import disconnect_all from jinja2 import Environment, PackageLoader from libvirt import libvirtError @@ -28,7 +28,6 @@ InconsistentAttributeError, InvalidStateError, NetworkError, ) -from igvm.host import with_fabric_settings from igvm.hypervisor import Hypervisor from igvm.hypervisor_preferences import sort_by_preference from igvm.settings import ( @@ -63,7 +62,6 @@ def _check_defined(vm, fail_hard=True): log.info(error) -@with_fabric_settings def evacuate( hv_hostname: str, target_hv_query: Optional[str] = None, @@ -121,7 +119,6 @@ def evacuate( ) -@with_fabric_settings def vcpu_set(vm_hostname, count, offline=False): """Change the number of CPUs in a VM""" with ExitStack() as es: @@ -182,7 +179,6 @@ def vcpu_set(vm_hostname, count, offline=False): vm.start() -@with_fabric_settings def mem_set(vm_hostname, size, offline=False): """Change the memory size of a VM @@ -231,7 +227,6 @@ def mem_set(vm_hostname, size, offline=False): vm.start() -@with_fabric_settings def disk_set(vm_hostname, size): """Change the disk size of a VM @@ -272,7 +267,6 @@ def disk_set(vm_hostname, size): vm.dataset_obj.commit() -@with_fabric_settings def change_address( vm_hostname, new_address, offline=False, migrate=False, allow_reserved_hv=False, @@ -351,7 +345,6 @@ def change_address( vm.start() -@with_fabric_settings def vm_build( vm_hostname: str, run_puppet: bool = True, @@ -455,7 +448,6 @@ def vm_build( vm.dataset_obj.commit() -@with_fabric_settings # NOQA: C901 def vm_migrate( vm_hostname: str = None, vm_object=None, @@ -513,7 +505,7 @@ def vm_migrate( # likely that a specific hypervisor was requested. Any other filter # could resolve to multiple HVs. and (not isinstance(hv_filter['hostname'], BaseFilter) - or type(hv_filter['hostname']) == BaseFilter) + or type(hv_filter['hostname']) is BaseFilter) ): hypervisor = es.enter_context(_get_hypervisor( hv_filter['hostname'], @@ -616,7 +608,6 @@ def _reset_hypervisor(): previous_hypervisor.undefine_vm(_vm) -@with_fabric_settings def vm_start(vm_hostname): """Start a VM""" with _get_vm(vm_hostname) as vm: @@ -635,7 +626,6 @@ def vm_start(vm_hostname): ) -@with_fabric_settings def vm_stop(vm_hostname, force=False): """Gracefully stop a VM""" with _get_vm(vm_hostname, allow_retired=True) as vm: @@ -660,7 +650,6 @@ def vm_stop(vm_hostname, force=False): log.info('"{}" is stopped.'.format(vm.fqdn)) -@with_fabric_settings def vm_restart(vm_hostname, force=False, no_redefine=False): """Restart a VM @@ -696,7 +685,6 @@ def vm_restart(vm_hostname, force=False, no_redefine=False): log.info('"{}" is restarted.'.format(vm.fqdn)) -@with_fabric_settings def vm_delete(vm_hostname, retire=False): """Delete the VM from the hypervisor and from serveradmin @@ -762,7 +750,6 @@ def vm_delete(vm_hostname, retire=False): ) -@with_fabric_settings def vm_sync(vm_hostname): """Synchronize VM resource attributes to Serveradmin @@ -822,7 +809,6 @@ def vm_define(vm_hostname): vm_hostname, vm_dataset_obj['hypervisor']['hostname'])) -@with_fabric_settings # NOQA: C901 def host_info(vm_hostname): """Extract runtime information about a VM @@ -933,7 +919,6 @@ def _progress_bar(free_key, capacity_key, result_key, unit): print('{} : {}'.format(k.ljust(max_key_len), value)) -@with_fabric_settings def vm_rename(vm_hostname, new_hostname, offline=False): """Redefine the VM on the same hypervisor with a different name @@ -971,7 +956,6 @@ def vm_rename(vm_hostname, new_hostname, offline=False): vm.aws_rename(new_hostname) -@with_fabric_settings def clean_cert(hostname: str): """Revoke and delete a Puppet certificate from the Puppet CA""" vm = Query({'hostname': hostname}, ['hostname', 'puppet_ca']).get() diff --git a/igvm/drbd.py b/igvm/drbd.py index 7cc81651..327091b6 100644 --- a/igvm/drbd.py +++ b/igvm/drbd.py @@ -3,7 +3,6 @@ Copyright (c) 2018 InnoGames GmbH """ -from adminapi import api from contextlib import contextmanager from igvm.exceptions import RemoteCommandError from igvm.settings import DEFAULT_VG_NAME @@ -188,7 +187,6 @@ def get_host_config(self): addr=self.hv.dataset_obj['intern_ip'], port=self.get_device_port(), dm_minor=self.get_device_minor(), - lv_name=self.lv_name, disk=( 'mapper/{}_orig'.format(self.lv_name) if self.master_role diff --git a/igvm/host.py b/igvm/host.py index 27f99dd3..f93a5585 100644 --- a/igvm/host.py +++ b/igvm/host.py @@ -3,29 +3,80 @@ Copyright (c) 2018 InnoGames GmbH """ +import logging +import shlex +import socket +from datetime import datetime, timezone from io import BytesIO -from datetime import datetime - -import fabric.api -import fabric.state -from fabric.contrib import files from uuid import uuid4 -from paramiko import transport +import fabric +import paramiko.ssh_exception + from igvm.exceptions import RemoteCommandError, InvalidStateError -from igvm.settings import COMMON_FABRIC_SETTINGS +from igvm.settings import ( + make_fabric_config, + FABRIC_CONNECTION_ATTEMPTS, + FABRIC_CONNECTION_DEFAULTS, + FABRIC_RUN_DEFAULTS, +) from adminapi.dataset import DatasetError +log = logging.getLogger(__name__) + +# Registry of active connections for disconnect_all() +_active_connections = set() + + +class CommandResult(str): + """String subclass wrapping invoke.runners.Result for backward compat. -def with_fabric_settings(fn): - """Decorator to run a function with COMMON_FABRIC_SETTINGS.""" - def decorator(*args, **kwargs): - with fabric.api.settings(**COMMON_FABRIC_SETTINGS): - return fn(*args, **kwargs) - decorator.__name__ = '{}_with_fabric'.format(fn.__name__) - decorator.__doc__ = fn.__doc__ - return decorator + Fabric 1.x run()/sudo() returned a string-like object. Fabric 3.x + returns invoke.runners.Result. This wrapper lets callers keep using + .strip(), int(result), 'text' in result, etc. while also exposing + .ok, .failed, .return_code, .stdout, .stderr. + """ + + def __new__(cls, result): + # Fabric 1.x automatically stripped trailing whitespace from the + # string representation. Fabric 3.x does not, so we strip here + # to avoid embedded newlines breaking commands that interpolate + # the result (e.g. mktemp output used as a mount path). + stdout = result.stdout.strip() if result.stdout else '' + instance = super().__new__(cls, stdout) + instance._result = result + return instance + + @property + def return_code(self): + return self._result.return_code + + @property + def ok(self): + return self._result.ok + + @property + def failed(self): + return self._result.failed + + @property + def stdout(self): + return self._result.stdout if self._result.stdout else '' + + @property + def stderr(self): + return self._result.stderr if self._result.stderr else '' + + +def disconnect_all(): + """Close all tracked SSH connections.""" + for conn in list(_active_connections): + try: + conn.close() + except Exception: + pass + _active_connections.clear() class Host(object): @@ -35,6 +86,7 @@ def __init__(self, dataset_obj): self.dataset_obj = dataset_obj self.route_network = dataset_obj['route_network']['hostname'] self.fqdn = self.dataset_obj['hostname'] # TODO: Remove + self._connection = None def __str__(self): return self.fqdn @@ -61,69 +113,114 @@ def match_uid_name(self, uid_name): """Check if a given uid_name matches this host""" return uid_name.split('_', 1)[0] == str(self.dataset_obj['object_id']) - def fabric_settings(self, *args, **kwargs): - """Builds a fabric context manager to run commands on this host.""" - settings = COMMON_FABRIC_SETTINGS.copy() - settings.update({ - 'abort_exception': RemoteCommandError, - 'host_string': str(self.dataset_obj['hostname']), - }) - settings.update(kwargs) - return fabric.api.settings(*args, **settings) + def _get_connection(self): + """Return a cached fabric.Connection for this host.""" + if self._connection is None or not self._connection.is_connected: + hostname = str(self.dataset_obj['hostname']) + self._connection = fabric.Connection( + hostname, config=make_fabric_config(), **FABRIC_CONNECTION_DEFAULTS + ) + _active_connections.add(self._connection) + return self._connection + + def close_connection(self): + """Close and discard the cached connection.""" + if self._connection is not None: + try: + self._connection.close() + except Exception: + pass + _active_connections.discard(self._connection) + self._connection = None def run(self, *args, **kwargs): """Runs a command on the remote host. :param warn_only: If set, no exception is raised if the command fails :param silent: If set, no output is written for successful runs""" - settings = [] - warn_only = kwargs.get('warn_only', False) - with_sudo = kwargs.get('with_sudo', True) - kwargs['pty'] = kwargs.get('pty', True) - if kwargs.get('silent', False): - hide = 'everything' if warn_only else 'commands' - settings.append(fabric.api.hide(hide)) - - # Purge settings that should not be passed to run() - for setting in ['warn_only', 'silent', 'with_sudo']: - if setting in kwargs: - del kwargs[setting] - - with self.fabric_settings(*settings, warn_only=warn_only): + warn_only = kwargs.pop('warn_only', False) + with_sudo = kwargs.pop('with_sudo', True) + silent = kwargs.pop('silent', False) + + # Build run kwargs + run_kwargs = dict(FABRIC_RUN_DEFAULTS) + run_kwargs['pty'] = kwargs.pop('pty', run_kwargs.get('pty', True)) + # Always suppress invoke's UnexpectedExit so we can raise our own + # RemoteCommandError instead (checked below after the call). + run_kwargs['warn'] = True + + if silent: + run_kwargs['hide'] = True + + # Pass through any remaining kwargs (e.g. shell, shell_escape) + # Map shell_escape to Fabric 3.x equivalent if present + kwargs.pop('shell_escape', None) + + # 'shell' kwarg: In Fabric 1.x, shell=False meant don't wrap in shell. + # In Fabric 3.x/invoke, the equivalent is setting shell to empty string + # or using the command directly. We'll pass it through. + if 'shell' in kwargs: + shell_val = kwargs.pop('shell') + if shell_val is False: + run_kwargs['shell'] = '' + + run_kwargs.update(kwargs) + + # Remove abort_exception if present (Fabric 1.x only) + run_kwargs.pop('abort_exception', None) + + conn = self._get_connection() + runner = conn.sudo if with_sudo else conn.run + + # Fabric 3's sudo() prepends "sudo -S -p '...' " directly to the + # command without wrapping it in a shell, unlike Fabric. + # This breaks commands containing shell constructs (while, for, if, + # pipes, etc.). Wrap in "bash -c '...'" to restore the old behavior. + if with_sudo and args: + cmd = args[0] + args = (f"bash -c {shlex.quote(cmd)}",) + args[1:] + + for attempt in range(FABRIC_CONNECTION_ATTEMPTS): try: - if with_sudo: - return fabric.api.sudo(*args, **kwargs) + result = runner(*args, **run_kwargs) + if not warn_only and result.failed: + raise RemoteCommandError( + f'Command failed with return code {result.return_code}: {args[0] if args else ""}' + ) + return CommandResult(result) + except ( + paramiko.ssh_exception.SSHException, + paramiko.ssh_exception.NoValidConnectionsError, + socket.error, + EOFError, + ): + if attempt < FABRIC_CONNECTION_ATTEMPTS - 1: + log.warning( + 'Connection lost, retrying (attempt %d/%d)...', + attempt + 2, FABRIC_CONNECTION_ATTEMPTS, + ) + self.close_connection() + conn = self._get_connection() + runner = conn.sudo if with_sudo else conn.run else: - return fabric.api.run(*args, **kwargs) - except transport.socket.error: - # Retry once if connection was lost - host = fabric.api.env.host_string - if host and host in fabric.state.connections: - fabric.state.connections[host].get_transport().close() - if with_sudo: - return fabric.api.sudo(*args, **kwargs) - else: - return fabric.api.run(*args, **kwargs) + raise - def file_exists(self, *args, **kwargs): - """Run a fabric.contrib.files.exists on this host with sudo.""" - with self.fabric_settings(): - try: - return files.exists(*args, **kwargs) - except transport.socket.error: - # Retry once if connection was lost - host = fabric.api.env.host_string - if host and host in fabric.state.connections: - fabric.state.connections[host].get_transport().close() - return files.exists(*args, **kwargs) + def file_exists(self, path, *args, **kwargs): + """Check if a file exists on the remote host.""" + conn = self._get_connection() + result = conn.sudo( + f'test -e {path}', + warn=True, hide=True, + ) + return result.ok def read_file(self, path): """Reads a file from the remote host and returns contents.""" if '*' in path: raise ValueError('No globbing supported') - with self.fabric_settings(fabric.api.hide('commands')): - fd = BytesIO() - fabric.api.get(path, fd) - return fd.getvalue() + conn = self._get_connection() + fd = BytesIO() + conn.get(path, fd) + return fd.getvalue() def put(self, remote_path, local_path, mode='0644'): """Same as Fabric's put but with working sudo permissions @@ -132,22 +229,21 @@ def put(self, remote_path, local_path, mode='0644'): broken, at least for mounted VM. This is why we run extra commands in here. """ - with self.fabric_settings(): - tempfile = '/tmp/' + str(uuid4()) - fabric.api.put(local_path, tempfile) - self.run( - 'mv {0} {1} ; chmod {2} {1}' - .format(tempfile, remote_path, mode) - ) + conn = self._get_connection() + tempfile = '/tmp/' + str(uuid4()) + conn.put(local_path, tempfile) + self.run( + f'mv {tempfile} {remote_path} ; chmod {mode} {remote_path}' + ) - def acquire_lock(self, allow_fail=False): + def acquire_lock(self): if self.dataset_obj['igvm_locked'] is not None: raise InvalidStateError( 'Server "{0}" is already being worked on by another igvm' .format(self.dataset_obj['hostname']) ) - self.dataset_obj['igvm_locked'] = datetime.utcnow() + self.dataset_obj['igvm_locked'] = datetime.now(timezone.utc) try: self.dataset_obj.commit() except DatasetError: diff --git a/igvm/hypervisor.py b/igvm/hypervisor.py index bbcb3726..2dc62479 100644 --- a/igvm/hypervisor.py +++ b/igvm/hypervisor.py @@ -131,15 +131,11 @@ def vm_lv_update_name(self, vm): """ old_name = self.get_volume_by_vm(vm).name() new_name = vm.uid_name - with self.fabric_settings(): - if old_name != new_name: - self.run( - 'lvrename {} {}'.format( - self.get_volume_by_vm(vm).path(), - vm.uid_name - ) - ) - self.get_storage_pool(vg_name=vm.vg_name).refresh() + if old_name != new_name: + self.run( + f'lvrename {self.get_volume_by_vm(vm).path()} {vm.uid_name}' + ) + self.get_storage_pool(vg_name=vm.vg_name).refresh() def vm_mount_path(self, vm): """Returns the mount path for a VM or raises HypervisorError if not @@ -471,7 +467,7 @@ def download_and_extract_image(self, image, target_dir): 'set -e ; ' 'flock -w 120 9 ; ' 'curl -o {img_path}/{img_file}.md5 {md5_url} ; ' - 'sed -Ei \'s_ (.*/)?([a-zA-Z0-9\.\-]+)$_ {img_path}/\\2_\' ' + "sed -Ei 's_ (.*/)?([a-zA-Z0-9\\.\\-]+)$_ {img_path}/\\2_' " '{img_path}/{img_file}.md5 ; ' 'md5sum -c {img_path}/{img_file}.md5 || ' 'curl -o {img_path}/{img_file} {img_url} ; ' @@ -872,14 +868,14 @@ def get_free_disk_size_gib(self, safe=True, vg_name=DEFAULT_VG_NAME): pool_info = self.get_storage_pool(vg_name=vg_name).info() # Floor instead of ceil because we check free instead of used space vg_size_gib = math.floor(float(pool_info[3]) / 1024 ** 3) - if safe is True: + if safe: vg_size_gib -= RESERVED_DISK[self.get_storage_type()] return vg_size_gib def mount_temp(self, device, suffix=''): """Mounts given device into temporary path""" - mount_dir = self.run('mktemp -d --suffix {}'.format(suffix)) - self.run('mount {0} {1}'.format(device, mount_dir)) + mount_dir = self.run(f'mktemp -d --suffix {suffix}') + self.run(f'mount {device} {mount_dir}') return mount_dir def umount_temp(self, device_or_path): @@ -903,7 +899,7 @@ def umount_temp(self, device_or_path): 'umount {0}'.format(device_or_path), warn_only=(i < retry - 1), ) - if res.succeeded: + if res.ok: return def remove_temp(self, mount_path): diff --git a/igvm/kvm.py b/igvm/kvm.py index b66a18b5..2bb49697 100644 --- a/igvm/kvm.py +++ b/igvm/kvm.py @@ -426,7 +426,7 @@ def generate_domain_xml(hypervisor, vm): log.info('KVM: Memory hotplug disabled, requires qemu 2.3') # Remove whitespace and re-indent properly. - out = re.sub(b'>\s+<', b'><', ElementTree.tostring(tree)) + out = re.sub(rb'>\s+<', b'><', ElementTree.tostring(tree)) domain_xml = minidom.parseString(out).toprettyxml() return domain_xml diff --git a/igvm/puppet.py b/igvm/puppet.py index 0d656055..0c221cad 100644 --- a/igvm/puppet.py +++ b/igvm/puppet.py @@ -7,11 +7,16 @@ from time import sleep from adminapi.dataset import Query, DatasetObject -from fabric.api import settings -from fabric.operations import sudo + +import fabric from igvm.exceptions import ConfigError -from igvm.settings import COMMON_FABRIC_SETTINGS +from igvm.host import CommandResult +from igvm.settings import ( + make_fabric_config, + FABRIC_CONNECTION_DEFAULTS, + IGVM_SSH_USER, +) logger = getLogger(__name__) @@ -93,13 +98,22 @@ def clean_cert(vm: DatasetObject, retries: int = 10) -> None: def run_cmd(host: str, cmd: str): - if 'user' in COMMON_FABRIC_SETTINGS: - user = COMMON_FABRIC_SETTINGS['user'] - else: - user = None - - with settings(host_string=host, user=user, warn_only=True): - return sudo(cmd, quiet=True, pty=False, shell=False) + conn_kwargs = dict(FABRIC_CONNECTION_DEFAULTS) + if IGVM_SSH_USER: + conn_kwargs['user'] = IGVM_SSH_USER + + conn = fabric.Connection(host, config=make_fabric_config(), **conn_kwargs) + try: + # Prepend a space to the command so that invoke's sudo() produces + # the two-space gap between the prompt and the command that the + # SSH_ORIGINAL_COMMAND whitelist on puppet CA hosts expects: + # "sudo -S -p 'sudo password:' " + # invoke builds: "sudo -S -p '{prompt}' {cmd}" — one space from the + # format string plus our leading space gives the required two. + result = conn.sudo(' ' + cmd, hide=True, warn=True, pty=False) + return CommandResult(result) + finally: + conn.close() def find_puppet_executable(host: str) -> str: @@ -152,7 +166,10 @@ def clean_cert_v6( # Check if the cleaning was successful or if there was nothing to # clean in the first place - if res.return_code == 1 and 'Could not find files to clean' in res: + if res.return_code == 1 and ( + 'Could not find files to clean' in res + or 'Could not find files to clean' in res.stderr + ): logger.debug( f'Skip revoking of {vm_host} because there is no valid ' 'certificate known to the CA', diff --git a/igvm/settings.py b/igvm/settings.py index 1c889ebc..a0a42d7e 100644 --- a/igvm/settings.py +++ b/igvm/settings.py @@ -6,6 +6,8 @@ from os import environ from sys import stdout +import fabric + from libvirt import ( VIR_MIGRATE_PEER2PEER, VIR_MIGRATE_TLS, @@ -21,21 +23,31 @@ OverAllocation, ) -COMMON_FABRIC_SETTINGS = dict( - disable_known_hosts=True, - use_ssh_config=True, - always_use_pty=stdout.isatty(), - forward_agent=True, - shell='/bin/sh -c', - timeout=5, - connection_attempts=3, - remote_interrupt=True, +IGVM_SSH_USER = environ.get('IGVM_SSH_USER') + +FABRIC_CONNECTION_DEFAULTS = dict( + connect_timeout=5, + connect_kwargs={ + 'allow_agent': True, + }, +) + +if IGVM_SSH_USER: + FABRIC_CONNECTION_DEFAULTS['user'] = IGVM_SSH_USER + +# Fabric 1.x used 'sudo password:' as the sudo prompt, while Fabric 3.x +# defaults to '[sudo] password:'. Remote hosts with SSH_ORIGINAL_COMMAND +# whitelists match the old format, so we must keep using it. +FABRIC_SUDO_PROMPT = 'sudo password:' + +def make_fabric_config(): + return fabric.Config(overrides={'sudo': {'prompt': FABRIC_SUDO_PROMPT}}) + +FABRIC_RUN_DEFAULTS = dict( + pty=stdout.isatty(), ) -# Can't not add a key with dict built above and None value gets interpreted -# as "None" username, thus separate code. -if 'IGVM_SSH_USER' in environ: - COMMON_FABRIC_SETTINGS['user'] = environ.get('IGVM_SSH_USER') +FABRIC_CONNECTION_ATTEMPTS = 3 DEFAULT_VG_NAME = 'xen-data' # Reserved pool space on Hypervisor diff --git a/igvm/vm.py b/igvm/vm.py index 2bcd6da8..6ff68b21 100644 --- a/igvm/vm.py +++ b/igvm/vm.py @@ -8,13 +8,12 @@ import logging import os import re -import stat +import socket import time import botocore.exceptions import tqdm from base64 import b64decode -from grp import getgrnam from hashlib import sha1, sha256 from io import BytesIO from pathlib import Path @@ -23,10 +22,10 @@ from uuid import uuid4 import boto3 +import fabric +import paramiko.ssh_exception from botocore.exceptions import ClientError, CapacityNotAvailableError -from fabric.api import cd, get, hide, put, run, settings -from fabric.contrib.files import upload_template -from fabric.exceptions import NetworkError +from jinja2 import Environment, FileSystemLoader from json.decoder import JSONDecodeError from urllib.error import HTTPError from urllib.request import Request, urlopen @@ -40,6 +39,7 @@ AWS_INSTANCES_OVERVIEW_FILE, AWS_INSTANCES_OVERVIEW_FILE_ETAG, AWS_INSTANCES_OVERVIEW_URL, + FABRIC_CONNECTION_DEFAULTS, MEM_BLOCK_BOUNDARY_GiB, MEM_BLOCK_SIZE_GiB, DEFAULT_VG_NAME, @@ -87,65 +87,53 @@ def __init__(self, dataset_obj, hypervisor=None): else: self.vg_name = DEFAULT_VG_NAME - def vm_host(self): - """ Return correct ssh host for mounted and unmounted vm """ - - if self.mounted: - return self.hypervisor.fabric_settings() - return self.fabric_settings() - def vm_path(self, path=''): """ Append correct prefix to reach VM's / directory """ if self.mounted: - return '{}/{}'.format( - self.hypervisor.vm_mount_path(self), - path, - ) - return '/{}'.format(path) + return f'{self.hypervisor.vm_mount_path(self)}/{path}' + return f'/{path}' - def run(self, command, silent=False, with_sudo=True): + def run(self, command, silent=False, with_sudo=True, **kwargs): """ Same as Fabric's run() but works on mounted or running vm When running in a mounted VM image, run everything in chroot and in separate shell inside chroot. Normally Fabric runs shell around commands. """ - with self.vm_host(): - if self.mounted: - return self.hypervisor.run( - 'chroot {} /bin/sh -c \'{}\''.format( - self.vm_path(''), command, - ), - shell=False, shell_escape=True, - silent=silent, - with_sudo=with_sudo, - ) - return super(VM, self).run(command, silent=silent) + if self.mounted: + return self.hypervisor.run( + f'chroot {self.vm_path("")} /bin/sh -c \'{command}\'', + shell=False, shell_escape=True, + silent=silent, + with_sudo=with_sudo, + ) + return super(VM, self).run( + command, silent=silent, with_sudo=with_sudo, **kwargs, + ) - def read_file(self, path): + def read_file(self, path: str): """Read a file from a running VM or a mounted image on HV.""" - with self.vm_host(): - return super(VM, self).read_file(self.vm_path(path)) + if self.mounted: + return self.hypervisor.read_file(self.vm_path(path)) + return super(VM, self).read_file(self.vm_path(path)) def upload_template(self, filename, destination, context=None): """" Same as Fabric's template() but works on mounted or running vm """ template_dir = os.path.join(os.path.dirname(__file__), 'templates') - with self.vm_host(): - return upload_template( - filename, - self.vm_path(destination), - context, - backup=False, - use_jinja=True, - template_dir=template_dir, - use_sudo=True, - ) + env = Environment(loader=FileSystemLoader(template_dir)) + template = env.get_template(filename) + rendered = template.render(**(context or {})) + fd = BytesIO(rendered.encode('utf-8')) + self.put(self.vm_path(destination), fd) def get(self, remote_path, local_path): """" Same as Fabric's get() but works on mounted or running vm """ - with self.vm_host(): - return get(self.vm_path(remote_path), local_path, temp_dir='/tmp') + if self.mounted: + conn = self.hypervisor._get_connection() + else: + conn = self._get_connection() + conn.get(self.vm_path(remote_path), local_path) def put(self, remote_path, local_path, mode='0644'): """ Same as Fabric's put() but works on mounted or running vm @@ -154,12 +142,17 @@ def put(self, remote_path, local_path, mode='0644'): seems broken, at least for mounted VM. This is why we run extra commands here. """ - with self.vm_host(): - tempfile = '/tmp/' + str(uuid4()) - put(local_path, self.vm_path(tempfile)) - self.run('mv {0} {1} ; chmod {2} {1}'.format( - tempfile, remote_path, mode - )) + if self.mounted: + host_obj = self.hypervisor + else: + host_obj = self + + conn = host_obj._get_connection() + tempfile = '/tmp/' + str(uuid4()) + conn.put(local_path, tempfile) + host_obj.run( + f'mv {tempfile} {remote_path} ; chmod {mode} {remote_path}' + ) def set_state(self, new_state, transaction=None): """Changes state of VM for LB and Nagios downtimes""" @@ -169,7 +162,7 @@ def set_state(self, new_state, transaction=None): return if new_state == self.previous_state: return - log.debug('Setting VM to state {}'.format(new_state)) + log.debug(f'Setting VM to state {new_state}') self.dataset_obj['state'] = new_state self.dataset_obj.commit() if transaction: @@ -227,7 +220,7 @@ def check_serveradmin_config(self): ( 'memory', lambda v: v % mul_numa_nodes == 0, - 'memory must be multiple of {}MiB'.format(mul_numa_nodes), + f'memory must be multiple of {mul_numa_nodes}MiB', ), ('num_cpu', lambda v: v > 0, 'num_cpu must be > 0'), ('os', lambda v: True, 'os must be set'), @@ -273,7 +266,7 @@ def check_serveradmin_config(self): for attr, check, err in validations: value = self.dataset_obj[attr] if not value: - raise ConfigError('"{}" attribute is not set'.format(attr)) + raise ConfigError(f'"{attr}" attribute is not set') if not check(value): raise ConfigError(err) @@ -401,8 +394,7 @@ def aws_start(self): ) log.debug(response) if current_state and current_state == AWS_RETURN_CODES['running']: - log.info('{} is already running.'.format( - self.dataset_obj['hostname'])) + log.info(f'{self.dataset_obj["hostname"]} is already running.') return except ClientError as e: raise VMError(e) @@ -414,7 +406,7 @@ def aws_start(self): if not host_up: raise VMError('The server is not reachable with SSH') - + def aws_restart(self): """AWS restart @@ -426,7 +418,7 @@ def aws_restart(self): InstanceIds=[self.dataset_obj['aws_instance_id']], DryRun=False ) - + except ClientError as e: raise VMError(e) @@ -583,8 +575,7 @@ def aws_delete(self): def is_running(self): if self.dataset_obj['datacenter_type'] not in ['aws.dct', 'kvm.dct']: raise NotImplementedError( - 'This operation is not yet supported for {}'.format( - self.dataset_obj['datacenter_type']) + f'This operation is not yet supported for {self.dataset_obj["datacenter_type"]}' ) if self.dataset_obj['datacenter_type'] == 'kvm.dct': return self.hypervisor.vm_running(self) @@ -600,9 +591,7 @@ def wait_for_running(self, running=True, timeout=60): """ action = 'boot' if running else 'shutdown' for i in range(timeout, 1, -1): - print( - 'Waiting for VM "{}" to {}... {} s'.format( - self.fqdn, action, i)) + log.info(f'Waiting for VM "{self.fqdn}" to {action}... {i} s') if self.hypervisor.vm_running(self) == running: return True time.sleep(1) @@ -686,7 +675,7 @@ def build( self.hypervisor.check_vm(self, offline=True) if not run_puppet or self.dataset_obj['puppet_disabled']: - log.warn( + log.warning( 'Puppet is disabled on the VM. It will not receive network ' 'configuration. Expect things to go south.' ) @@ -745,7 +734,7 @@ def build( self.run('/buildvm-postboot') self.run('rm /buildvm-postboot') - log.info('"{}" is successfully built.'.format(self.fqdn)) + log.info(f'"{self.fqdn}" is successfully built.') def aws_build(self, run_puppet: bool = True, @@ -830,7 +819,7 @@ def aws_build(self, break except ClientError as e: raise VMError(e) - except CapacityNotAvailableError as e: + except CapacityNotAvailableError: continue if run_puppet: @@ -839,8 +828,7 @@ def aws_build(self, self.dataset_obj['aws_instance_id'] = response['Instances'][0][ 'InstanceId'] - log.info('waiting for {} to be started'.format( - self.dataset_obj['hostname'])) + log.info(f'waiting for {self.dataset_obj["hostname"]} to be started') vm_setup = tqdm.tqdm( total=timeout_vm_setup, desc='vm_setup', position=0) cloud_init = tqdm.tqdm( @@ -868,26 +856,33 @@ def aws_build(self, time.sleep(1) continue - with settings( - hide('aborts'), - host_string=self.dataset_obj['hostname'], - warn_only=True, - abort_on_prompts=True, + try: + conn = fabric.Connection( + self.dataset_obj['hostname'], + **FABRIC_CONNECTION_DEFAULTS, + ) + result = conn.run( + 'find /var/lib/cloud/instance/boot-finished', + hide=True, warn=True, + ) + conn.close() + if result.ok: + cloud_init.update(timeout_cloud_init - retry - 1) + break + except ( + SystemExit, + paramiko.ssh_exception.SSHException, + paramiko.ssh_exception.NoValidConnectionsError, + socket.error, + OSError, + EOFError, ): - try: - if run( - 'find /var/lib/cloud/instance/boot-finished', - quiet=True - ).succeeded: - cloud_init.update(timeout_cloud_init - retry - 1) - break - except (SystemExit, NetworkError): - time.sleep(1) + time.sleep(1) # TODO: Handle overrun timeout self.create_ssh_keys() - log.info('"{}" is successfully built in AWS.'.format(self.fqdn)) + log.info(f'"{self.fqdn}" is successfully built in AWS.') def rename(self, new_hostname): """Rename the VM""" @@ -956,10 +951,11 @@ def prepare_vm(self): # Copy resolv.conf from Hypervisor fd = BytesIO() - with self.hypervisor.fabric_settings( - cd(self.hypervisor.vm_mount_path(self)) - ): - get('/etc/resolv.conf', fd) + conn = self.hypervisor._get_connection() + conn.get( + f'{self.hypervisor.vm_mount_path(self)}/etc/resolv.conf', + fd, + ) self.put('/etc/resolv.conf', fd) self.create_ssh_keys() @@ -978,16 +974,16 @@ def create_ssh_keys(self): # This will also create the public key files. for key_id, key_type in key_types: self.run( - 'ssh-keygen -q -t {0} -N "" ' - '-f /etc/ssh/ssh_host_{0}_key'.format(key_type)) + f'ssh-keygen -q -t {key_type} -N "" ' + f'-f /etc/ssh/ssh_host_{key_type}_key') fd = BytesIO() - self.get('/etc/ssh/ssh_host_{0}_key.pub'.format(key_type), fd) + self.get(f'/etc/ssh/ssh_host_{key_type}_key.pub', fd) pub_key = b64decode(fd.getvalue().split(None, 2)[1]) for fp_id, fp_type in fp_types: - self.dataset_obj['sshfp'].add('{} {} {}'.format( - key_id, fp_id, fp_type(pub_key).hexdigest() - )) + self.dataset_obj['sshfp'].add( + f'{key_id} {fp_id} {fp_type(pub_key).hexdigest()}' + ) def run_puppet(self, clear_cert=False, debug=False, tries=2): """Runs Puppet in chroot on the hypervisor.""" @@ -1001,19 +997,15 @@ def run_puppet(self, clear_cert=False, debug=False, tries=2): puppet_bin = PUPPET_BINARY_PATH.get(self.dataset_obj['os']) if puppet_bin is None: raise ConfigError(f'Puppet binary configuration for OS {self.dataset_obj["os"]} missing') + debug_flag = ' --debug' if debug else '' puppet_command = ( - '( {} agent ' - '--detailed-exitcodes ' - '--fqdn={} --server={} --ca_server={} ' - '--no-report --waitforcert=10 --onetime --no-daemonize ' - '--skip_tags=chroot_unsafe --verbose{} ) ;' - '[ $? -eq 2 ]'.format( - puppet_bin, - self.fqdn, - self.dataset_obj['puppet_master'], - self.dataset_obj['puppet_ca'], - ' --debug' if debug else '', - ) + f'( {puppet_bin} agent ' + f'--detailed-exitcodes ' + f'--fqdn={self.fqdn} --server={self.dataset_obj["puppet_master"]} ' + f'--ca_server={self.dataset_obj["puppet_ca"]} ' + f'--no-report --waitforcert=10 --onetime --no-daemonize ' + f'--skip_tags=chroot_unsafe --verbose{debug_flag} ) ;' + f'[ $? -eq 2 ]' ) # The Puppetserver fails sometimes with HTTP 500 or isn't reachable @@ -1039,7 +1031,7 @@ def block_autostart(self): self.put('/usr/sbin/policy-rc.d', fd, '0755') def unblock_autostart(self): - self.run('rm /usr/sbin/policy-rc.d') + self.run('rm -f /usr/sbin/policy-rc.d') def copy_postboot_script(self, script): self.put('/buildvm-postboot', script, '0755') @@ -1086,52 +1078,46 @@ def aws_disk_set(self, size: int, timeout_disk_resize: int = 60) -> None: if volume_state['ModificationState'] == 'optimizing': raise VMError( - 'disk resize already in progress ' - 'for {} (state: {})'.format( - self.dataset_obj['hostname'], - volume_state['ModificationState']) + f'disk resize already in progress ' + f'for {self.dataset_obj["hostname"]} (state: {volume_state["ModificationState"]})' ) except ClientError: log.debug( - 'First disk resize of {} ({}) - ' - 'no modification state available in AWS'.format( - self.dataset_obj['hostname'], volume_id) + f'First disk resize of {self.dataset_obj["hostname"]} ({volume_id}) - ' + f'no modification state available in AWS' ) pass self.ec2c.modify_volume(VolumeId=volume_id, Size=int(size)) partition = self.run('findmnt -nro SOURCE /') - disk = self.run('lsblk -nro PKNAME {}'.format(partition)) - new_disk_size = self.run('lsblk -bdnro size /dev/{}'.format(disk)) + disk = self.run(f'lsblk -nro PKNAME {partition}') + new_disk_size = self.run(f'lsblk -bdnro size /dev/{disk}') new_disk_size_gib = int(new_disk_size) / 1024 / 1024 / 1024 while timeout_disk_resize and size != new_disk_size_gib: timeout_disk_resize -= 1 time.sleep(1) - new_disk_size = self.run('lsblk -bdnro size /dev/{}'.format(disk)) + new_disk_size = self.run(f'lsblk -bdnro size /dev/{disk}') new_disk_size_gib = int(new_disk_size) / 1024 / 1024 / 1024 if timeout_disk_resize == 0: raise VMError('Timeout for disk resize reached') - with settings( - host_string=self.dataset_obj['hostname'], - warn_only=True, - ): - disk_resize = self.run('growpart /dev/{} 1'.format(disk)) - if disk_resize.succeeded: - fs_resize = self.run('resize2fs {}'.format(partition)) - if fs_resize.succeeded: - log.info( - 'successfully resized disk of {} to {}GB'.format( - self.dataset_obj['hostname'], size) - ) - return - - raise VMError('disk resize for {} failed'.format( - self.dataset_obj['hostname']) + disk_resize = self.run( + f'growpart /dev/{disk} 1', warn_only=True, + ) + if disk_resize.ok: + fs_resize = self.run( + f'resize2fs {partition}', warn_only=True, ) + if fs_resize.ok: + log.info( + f'successfully resized disk of {self.dataset_obj["hostname"]} to {size}GB' + ) + return + + raise VMError(f'disk resize for {self.dataset_obj["hostname"]} failed') def aws_sync(self) -> dict: """AWS sync @@ -1256,7 +1242,7 @@ def aws_get_instances_overview( if resp.status == 200: etag = dict(resp.info())['ETag'] else: - log.warning('Could not retrieve ETag from {}'.format(url)) + log.warning(f'Could not retrieve ETag from {url}') etag = None if file.exists() and etag_file.exists() and etag: with open(etag_file, 'r+') as f: diff --git a/requirements.txt b/requirements.txt index 57e7750e..4b8aa399 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,10 @@ setuptools~=73.0 adminapi@git+https://github.com/innogames/serveradmin.git@v4.15.0#egg=adminapi netaddr~=0.8 -cffi~=1.14 -paramiko~=2.7 -Fabric3~=1.14 -libvirt-python>=7,<=9 +cffi>=1.14 +paramiko>=2.7 +fabric~=3.2 +libvirt-python>=7,<=12 jinja2~=2.11 markupsafe~=1.1 boto3~=1.26 diff --git a/tests/conftest.py b/tests/conftest.py index 2941ff94..598f7048 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -110,7 +110,7 @@ def clean_hv(hv, pattern): # Cleanup igvm_locked status after a timeout if hv.dataset_obj['igvm_locked'] is not None: - now = datetime.utcnow().replace(tzinfo=timezone.utc) + now = datetime.now(timezone.utc) diff = now - hv.dataset_obj['igvm_locked'] if diff >= IGVM_LOCKED_TIMEOUT: diff --git a/tests/test_integration.py b/tests/test_integration.py index d939bb68..259d95a3 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -14,9 +14,8 @@ from adminapi import api from adminapi.dataset import Query from adminapi.filters import Any -from fabric.api import env -from fabric.network import disconnect_all -from mock import patch +from igvm.host import disconnect_all +from unittest.mock import patch from igvm.commands import ( _get_vm, @@ -47,7 +46,6 @@ from igvm.puppet import clean_cert from igvm.settings import ( AWS_RETURN_CODES, - COMMON_FABRIC_SETTINGS, HYPERVISOR_ATTRIBUTES, HYPERVISOR_CPU_THRESHOLDS, KVM_HWMODEL_TO_CPUMODEL, @@ -63,9 +61,7 @@ ) basicConfig(level=INFO) -env.update(COMMON_FABRIC_SETTINGS) environ['IGVM_SSH_USER'] = 'igtesting' # Enforce user for integration testing -env.user = 'igtesting' environ['IGVM_MODE'] = 'staging'