diff --git a/ssg/constants.py b/ssg/constants.py index 09808480a71f..10f1f75b8e6f 100644 --- a/ssg/constants.py +++ b/ssg/constants.py @@ -51,6 +51,7 @@ OSCAP_GROUP_VAL = "xccdf_%s.content_group_values" % OSCAP_VENDOR OSCAP_GROUP_NON_PCI = "xccdf_%s.content_group_non-pci-dss" % OSCAP_VENDOR OSCAP_PATH = "oscap" +OSCAP_PROFILE_ALL_ID = "(all)" XCCDF11_NS = "http://checklists.nist.gov/xccdf/1.1" XCCDF12_NS = "http://checklists.nist.gov/xccdf/1.2" min_ansible_version = "2.5" diff --git a/tests/ssg_test_suite/oscap.py b/tests/ssg_test_suite/oscap.py index fbec5878e30f..954e36140c36 100644 --- a/tests/ssg_test_suite/oscap.py +++ b/tests/ssg_test_suite/oscap.py @@ -9,6 +9,9 @@ import json import datetime + +from ssg.constants import OSCAP_PROFILE_ALL_ID + from ssg_test_suite.log import LogHelper from ssg_test_suite import test_env from ssg_test_suite import common @@ -28,6 +31,9 @@ _XCCDF_NS = 'http://checklists.nist.gov/xccdf/1.2' +PROFILE_ALL_ID_SINGLE_QUOTED = False + + def analysis_to_serializable(analysis): result = dict(analysis) for key, value in analysis.items(): @@ -107,6 +113,13 @@ def get_result_id_from_arf(arf_path, verbose_path): return res_id +def single_quote_string(input): + result = input + for char in "\"'": + result = result.replace(char, "") + return "'{}'".format(result) + + def generate_fixes_remotely(formatting, verbose_path): command_base = ['oscap', 'xccdf', 'generate', 'fix'] command_options = [ @@ -119,7 +132,8 @@ def generate_fixes_remotely(formatting, verbose_path): if 'result_id' in formatting: command_options.extend(['--result-id', formatting['result_id']]) - command_string = ' '.join(command_base + command_options + command_operands) + command_components = command_base + command_options + command_operands + command_string = ' '.join([single_quote_string(c) for c in command_components]) rc, stdout = common.run_cmd_remote( command_string, formatting['domain_ip'], verbose_path) if rc != 0: @@ -208,6 +222,32 @@ def send_arf_to_remote_machine_and_generate_remediations_there( return False +def is_virtual_oscap_profile(profile): + """ Test if the profile belongs to the so called category virtual + from OpenSCAP available profiles. It can be (all) or other id we + might come up in the future, it just needs to be encapsulated + with parenthesis for example "(custom_profile)". + """ + if profile is not None: + if profile == OSCAP_PROFILE_ALL_ID: + return True + else: + if "(" == profile[:1] and ")" == profile[-1:]: + return True + return False + + +def process_profile_id(profile): + # Detect if the profile is virtual and include single quotes if needed. + if is_virtual_oscap_profile(profile): + if PROFILE_ALL_ID_SINGLE_QUOTED: + return "'{}'".format(profile) + else: + return profile + else: + return profile + + class GenericRunner(object): def __init__(self, environment, profile, datastream, benchmark_id): self.environment = environment diff --git a/tests/ssg_test_suite/rule.py b/tests/ssg_test_suite/rule.py index f38bb5e22369..b25329372c57 100644 --- a/tests/ssg_test_suite/rule.py +++ b/tests/ssg_test_suite/rule.py @@ -9,7 +9,7 @@ import collections import json -from ssg.constants import OSCAP_PROFILE +from ssg.constants import OSCAP_PROFILE, OSCAP_PROFILE_ALL_ID from ssg_test_suite import oscap from ssg_test_suite import xml_operations from ssg_test_suite import test_env @@ -18,8 +18,6 @@ import data -ALL_PROFILE_ID = "(all)" - logging.getLogger(__name__).addHandler(logging.NullHandler()) @@ -36,7 +34,7 @@ def get_viable_profiles(selected_profiles, datastream, benchmark): all_profiles_elements = xml_operations.get_all_profiles_in_benchmark( datastream, benchmark, logging) all_profiles = [el.attrib["id"] for el in all_profiles_elements] - all_profiles.append(ALL_PROFILE_ID) + all_profiles.append(OSCAP_PROFILE_ALL_ID) for ds_profile in all_profiles: if 'ALL' in selected_profiles: @@ -122,7 +120,7 @@ def _run_test(self, profile, test_data): runner_cls = oscap.REMEDIATION_RULE_RUNNERS[self.remediate_using] runner = runner_cls( - self.test_env, profile, self.datastream, self.benchmark_id, + self.test_env, oscap.process_profile_id(profile), self.datastream, self.benchmark_id, rule_id, scenario.script, self.dont_clean, self.manual_debug) if not self._initial_scan_went_ok(runner, rule_id, scenario.context): return False @@ -229,10 +227,10 @@ def _modify_parameters(self, script, params): params['profiles'] = [self.scenarios_profile] if not params["profiles"]: - params["profiles"].append(ALL_PROFILE_ID) + params["profiles"].append(OSCAP_PROFILE_ALL_ID) logging.debug( "Added the {0} profile to the list of available profiles for {1}" - .format(ALL_PROFILE_ID, script)) + .format(OSCAP_PROFILE_ALL_ID, script)) return params def _parse_parameters(self, script): @@ -343,7 +341,8 @@ def perform_rule_check(options): checker.scenarios_profile = options.scenarios_profile # check if target is a complete profile ID, if not prepend profile prefix if (checker.scenarios_profile is not None and - not checker.scenarios_profile.startswith(OSCAP_PROFILE)): + not checker.scenarios_profile.startswith(OSCAP_PROFILE) and + not oscap.is_virtual_oscap_profile(checker.scenarios_profile)): checker.scenarios_profile = OSCAP_PROFILE+options.scenarios_profile checker.test_target(options.target) diff --git a/tests/test_suite.py b/tests/test_suite.py index 2cf877fcfc12..e53bc0dd4642 100755 --- a/tests/test_suite.py +++ b/tests/test_suite.py @@ -144,7 +144,6 @@ def parse_args(): " Variable selections will be done according " "to this profile.") - parser_combined = subparsers.add_parser("combined", help=("Tests all rules in a profile evaluating them " "against their test scenarios."),