Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion node_cli/cli/sync_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import click

from node_cli.core.node import init_sync, update_sync
from node_cli.core.node import init_sync, update_sync, repair_sync
from node_cli.utils.helper import (
abort_if_false,
error_exit,
Expand Down Expand Up @@ -95,3 +95,44 @@ def _init_sync(env_file, archive, catchup, historic_state, snapshot_from: Option
@streamed_cmd
def _update_sync(env_file, unsafe_ok):
update_sync(env_file)


@sync_node.command('repair', help='Start sync node from empty database')
@click.option('--yes', is_flag=True, callback=abort_if_false,
expose_value=False,
prompt='Are you sure you want to start sync node from empty database?')
@click.option(
'--archive',
help=TEXTS['init']['archive'],
is_flag=True
)
@click.option(
'--catchup',
help=TEXTS['init']['catchup'],
is_flag=True
)
@click.option(
'--historic-state',
help=TEXTS['init']['historic_state'],
is_flag=True
)
@click.option(
'--snapshot-from',
type=IP_TYPE,
default=None,
hidden=True,
help='Ip of the node from to download snapshot from'
)
@streamed_cmd
def _repair_sync(
archive: str,
catchup: str,
historic_state: str,
snapshot_from: Optional[str] = None
) -> None:
repair_sync(
archive=archive,
catchup=catchup,
historic_state=historic_state,
snapshot_from=snapshot_from
)
22 changes: 22 additions & 0 deletions node_cli/core/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
turn_on_op,
restore_op,
init_sync_op,
repair_sync_op,
update_sync_op
)
from node_cli.utils.print_formatters import (
Expand Down Expand Up @@ -234,6 +235,27 @@ def update_sync(env_filepath: str, unsafe_ok: bool = False) -> None:
logger.info('Node update finished')


@check_inited
@check_user
def repair_sync(
archive: bool,
catchup: bool,
historic_state: bool,
snapshot_from: str
) -> None:

env_params = extract_env_params(INIT_ENV_FILEPATH, sync_node=True)
schain_name = env_params['SCHAIN_NAME']
repair_sync_op(
schain_name=schain_name,
archive=archive,
catchup=catchup,
historic_state=historic_state,
snapshot_from=snapshot_from
)
logger.info('Schain was started from scratch')


def get_node_env(
env_filepath,
inited_node=False,
Expand Down
59 changes: 53 additions & 6 deletions node_cli/core/schains.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import glob
import logging
import os
import pprint
Expand All @@ -11,7 +12,8 @@
ALLOCATION_FILEPATH,
NODE_CONFIG_PATH,
NODE_CLI_STATUS_FILENAME,
SCHAIN_NODE_DATA_PATH
SCHAIN_NODE_DATA_PATH,
SCHAINS_MNT_DIR_SYNC
)
from node_cli.configs.env import get_env_config

Expand Down Expand Up @@ -94,20 +96,37 @@ def get_node_cli_schain_status_filepath(schain_name: str) -> str:
return os.path.join(SCHAIN_NODE_DATA_PATH, schain_name, NODE_CLI_STATUS_FILENAME)


def update_node_cli_schain_status(schain_name: str, status: dict) -> None:
def update_node_cli_schain_status(
schain_name: str,
repair_ts: Optional[int] = None,
snapshot_from: Optional[str] = None
) -> None:
path = get_node_cli_schain_status_filepath(schain_name)
os.makedirs(os.path.dirname(path), exist_ok=True)
if os.path.isdir(path):
orig_status = get_node_cli_schain_status(schain_name=schain_name)
orig_status.update({'repair_ts': repair_ts, 'snapshot_from': snapshot_from})
status = orig_status
else:
status = {
'schain_name': schain_name,
'repair_ts': repair_ts,
'snapshot_from': snapshot_from
}
os.makedirs(os.path.dirname(path), exist_ok=True)
save_json(path, status)


def get_node_cli_schain_status(schain_name: str) -> dict:
path = get_node_cli_schain_status_filepath(schain_name)
return read_json(path)


def toggle_schain_repair_mode(
schain: str,
snapshot_from: Optional[str] = None
) -> None:
ts = int(time.time())
status = {'schain_name': schain, 'repair_ts': ts}
status.update({'snapshot_from': snapshot_from})
update_node_cli_schain_status(schain, status)
update_node_cli_schain_status(schain_name=schain, repair_ts=ts, snapshot_from=snapshot_from)
print('Schain has been set for repair')


Expand Down Expand Up @@ -168,6 +187,10 @@ def make_btrfs_snapshot(src: str, dst: str) -> None:
run_cmd(['btrfs', 'subvolume', 'snapshot', src, dst])


def rm_btrfs_subvolume(subvolume: str) -> None:
run_cmd(['btrfs', 'subvolume', 'delete', subvolume])


def fillin_snapshot_folder(src_path: str, block_number: int) -> None:
snapshots_dirname = 'snapshots'
snapshot_folder_path = os.path.join(
Expand Down Expand Up @@ -224,3 +247,27 @@ def ensure_schain_volume(schain: str, schain_type: str, env_type: str) -> None:
ensure_volume(schain, size)
else:
logger.warning('Volume %s already exists', schain)


def cleanup_sync_datadir(schain_name: str, base_path: str = SCHAINS_MNT_DIR_SYNC) -> None:
base_path = os.path.join(base_path, schain_name)
regular_folders_pattern = f'{base_path}/[!snapshots]*'
logger.info('Removing regular folders')
for filepath in glob.glob(regular_folders_pattern):
if os.path.isdir(filepath):
logger.debug('Removing recursively %s', filepath)
shutil.rmtree(filepath)
if os.path.isfile(filepath):
os.remove(filepath)

logger.info('Removing subvolumes')
subvolumes_pattern = f'{base_path}/snapshots/*/*'
for filepath in glob.glob(subvolumes_pattern):
logger.debug('Deleting subvolume %s', filepath)
if os.path.isdir(filepath):
rm_btrfs_subvolume(filepath)
else:
os.remove(filepath)
logger.info('Cleaning up snapshots folder')
if os.path.isdir(base_path):
shutil.rmtree(base_path)
3 changes: 2 additions & 1 deletion node_cli/operations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@
update_sync as update_sync_op,
turn_off as turn_off_op,
turn_on as turn_on_op,
restore as restore_op
restore as restore_op,
repair_sync as repair_sync_op
)
31 changes: 29 additions & 2 deletions node_cli/operations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@
from node_cli.operations.skale_node import download_skale_node, sync_skale_node, update_images
from node_cli.core.checks import CheckType, run_checks as run_host_checks
from node_cli.core.iptables import configure_iptables
from node_cli.core.schains import update_node_cli_schain_status
from node_cli.core.schains import update_node_cli_schain_status, cleanup_sync_datadir
from node_cli.utils.docker_utils import (
compose_rm,
compose_up,
docker_cleanup,
remove_dynamic_containers
remove_dynamic_containers,
remove_schain_container,
start_admin,
stop_admin
)
from node_cli.utils.meta import get_meta_info, update_meta
from node_cli.utils.print_formatters import print_failed_requirements_checks
Expand Down Expand Up @@ -344,3 +347,27 @@ def restore(env, backup_path, config_only=False):
print_failed_requirements_checks(failed_checks)
return False
return True


def repair_sync(
schain_name: str,
archive: bool,
catchup: bool,
historic_state: bool,
snapshot_from: Optional[str]
) -> None:
stop_admin(sync_node=True)
remove_schain_container(schain_name=schain_name)

logger.info('Updating node options')
cleanup_sync_datadir(schain_name=schain_name)

logger.info('Updating node options')
node_options = NodeOptions()
node_options.archive = archive
node_options.catchup = catchup
node_options.historic_state = historic_state

logger.info('Updating cli status')
update_node_cli_schain_status(schain_name, snapshot_from=snapshot_from)
start_admin(sync_node=True)
50 changes: 50 additions & 0 deletions node_cli/utils/docker_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import itertools
import os
import logging
from typing import Optional

import docker
from docker.client import DockerClient
Expand All @@ -39,6 +40,7 @@

logger = logging.getLogger(__name__)

ADMIN_REMOVE_TIMEOUT = 60
SCHAIN_REMOVE_TIMEOUT = 300
IMA_REMOVE_TIMEOUT = 20
TELEGRAF_REMOVE_TIMEOUT = 20
Expand Down Expand Up @@ -131,6 +133,54 @@ def safe_rm(container: Container, timeout=DOCKER_DEFAULT_STOP_TIMEOUT, **kwargs)
logger.info(f'Container removed: {container_name}')


def stop_container(
container_name: str,
timeout: int = DOCKER_DEFAULT_STOP_TIMEOUT,
dclient: Optional[DockerClient] = None
) -> None:
dc = dclient or docker_client()
container = dc.containers.get(container_name)
logger.info('Stopping container: %s, timeout: %s', container_name, timeout)
container.stop(timeout=timeout)


def rm_container(
container_name: str,
timeout: int = DOCKER_DEFAULT_STOP_TIMEOUT,
dclient: Optional[DockerClient] = None
) -> None:
dc = dclient or docker_client()
container_names = [container.name for container in get_containers()]
if container_name in container_names:
container = dc.containers.get(container_name)
safe_rm(container)


def start_container(
container_name: str,
dclient: Optional[DockerClient] = None
) -> None:
dc = dclient or docker_client()
container = dc.containers.get(container_name)
logger.info('Starting container %s', container_name)
container.start()


def start_admin(sync_node: bool = False, dclient: Optional[DockerClient] = None) -> None:
container_name = 'skale_sync_admin' if sync_node else 'skale_admin'
start_container(container_name=container_name, dclient=dclient)


def stop_admin(sync_node: bool = False, dclient: Optional[DockerClient] = None) -> None:
container_name = 'skale_sync_admin' if sync_node else 'skale_admin'
stop_container(container_name=container_name, timeout=ADMIN_REMOVE_TIMEOUT, dclient=dclient)


def remove_schain_container(schain_name: str, dclient: Optional[DockerClient] = None) -> None:
container_name = f'skale_schain_{schain_name}'
rm_container(container_name, timeout=SCHAIN_REMOVE_TIMEOUT, dclient=dclient)


def backup_container_logs(
container: Container,
head: int = DOCKER_DEFAULT_HEAD_LINES,
Expand Down
13 changes: 11 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from node_cli.utils.docker_utils import docker_client
from node_cli.utils.global_config import generate_g_config_file

from tests.helper import TEST_META_V1, TEST_META_V2, TEST_META_V3
from tests.helper import TEST_META_V1, TEST_META_V2, TEST_META_V3, TEST_SCHAINS_MNT_DIR_SYNC


TEST_ENV_PARAMS = """
Expand Down Expand Up @@ -307,8 +307,17 @@ def tmp_config_dir():

@pytest.fixture
def tmp_schains_dir():
os.makedirs(SCHAIN_NODE_DATA_PATH)
os.makedirs(SCHAIN_NODE_DATA_PATH, exist_ok=True)
try:
yield SCHAIN_NODE_DATA_PATH
finally:
shutil.rmtree(SCHAIN_NODE_DATA_PATH)


@pytest.fixture
def tmp_sync_datadir():
os.makedirs(TEST_SCHAINS_MNT_DIR_SYNC, exist_ok=True)
try:
yield TEST_SCHAINS_MNT_DIR_SYNC
finally:
shutil.rmtree(TEST_SCHAINS_MNT_DIR_SYNC)
13 changes: 11 additions & 2 deletions tests/core/core_node_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from node_cli.configs import NODE_DATA_PATH
from node_cli.configs.resource_allocation import RESOURCE_ALLOCATION_FILEPATH
from node_cli.core.node import BASE_CONTAINERS_AMOUNT, is_base_containers_alive
from node_cli.core.node import init, pack_dir, update, is_update_safe
from node_cli.core.node import init, pack_dir, update, is_update_safe, repair_sync

from tests.helper import response_mock, safe_update_api_response, subprocess_run_mock
from tests.resources_test import BIG_DISK_SIZE
Expand Down Expand Up @@ -169,7 +169,9 @@ def test_update_node(mocked_g_config, resource_file):
), mock.patch('node_cli.core.resources.get_disk_size', return_value=BIG_DISK_SIZE), mock.patch(
'node_cli.core.host.init_data_dir'
):
with mock.patch('node_cli.utils.helper.requests.get', return_value=safe_update_api_response()): # noqa
with mock.patch(
'node_cli.utils.helper.requests.get', return_value=safe_update_api_response()
): # noqa
result = update(env_filepath, pull_config_for_schain=None)
assert result is None

Expand All @@ -183,3 +185,10 @@ def test_is_update_safe():
'node_cli.utils.helper.requests.get', return_value=safe_update_api_response(safe=False)
):
assert not is_update_safe()


def test_repair_sync(tmp_sync_datadir, mocked_g_config, resource_file):
with mock.patch('node_cli.core.schains.rm_btrfs_subvolume'), \
mock.patch('node_cli.utils.docker_utils.stop_container'), \
mock.patch('node_cli.utils.docker_utils.start_container'):
repair_sync(archive=True, catchup=True, historic_state=True, snapshot_from='127.0.0.1')
Loading