Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ssg/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
OSCAP_GROUP_VAL = "xccdf_%s.content_group_values" % OSCAP_VENDOR
OSCAP_GROUP_NON_PCI = "xccdf_%s.content_group_non-pci-dss" % OSCAP_VENDOR
OSCAP_PATH = "oscap"
OSCAP_PROFILE_ALL_ID = "(all)"
XCCDF11_NS = "http://checklists.nist.gov/xccdf/1.1"
XCCDF12_NS = "http://checklists.nist.gov/xccdf/1.2"
min_ansible_version = "2.5"
Expand Down
42 changes: 41 additions & 1 deletion tests/ssg_test_suite/oscap.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import json
import datetime


from ssg.constants import OSCAP_PROFILE_ALL_ID

from ssg_test_suite.log import LogHelper
from ssg_test_suite import test_env
from ssg_test_suite import common
Expand All @@ -28,6 +31,9 @@
_XCCDF_NS = 'http://checklists.nist.gov/xccdf/1.2'


PROFILE_ALL_ID_SINGLE_QUOTED = False


def analysis_to_serializable(analysis):
result = dict(analysis)
for key, value in analysis.items():
Expand Down Expand Up @@ -107,6 +113,13 @@ def get_result_id_from_arf(arf_path, verbose_path):
return res_id


def single_quote_string(input):
result = input
for char in "\"'":
result = result.replace(char, "")
return "'{}'".format(result)


def generate_fixes_remotely(formatting, verbose_path):
command_base = ['oscap', 'xccdf', 'generate', 'fix']
command_options = [
Expand All @@ -119,7 +132,8 @@ def generate_fixes_remotely(formatting, verbose_path):
if 'result_id' in formatting:
command_options.extend(['--result-id', formatting['result_id']])

command_string = ' '.join(command_base + command_options + command_operands)
command_components = command_base + command_options + command_operands
command_string = ' '.join([single_quote_string(c) for c in command_components])
rc, stdout = common.run_cmd_remote(
command_string, formatting['domain_ip'], verbose_path)
if rc != 0:
Expand Down Expand Up @@ -208,6 +222,32 @@ def send_arf_to_remote_machine_and_generate_remediations_there(
return False


def is_virtual_oscap_profile(profile):
""" Test if the profile belongs to the so called category virtual
from OpenSCAP available profiles. It can be (all) or other id we
might come up in the future, it just needs to be encapsulated
with parenthesis for example "(custom_profile)".
"""
if profile is not None:
if profile == OSCAP_PROFILE_ALL_ID:
return True
else:
if "(" == profile[:1] and ")" == profile[-1:]:
return True
return False


def process_profile_id(profile):
# Detect if the profile is virtual and include single quotes if needed.
if is_virtual_oscap_profile(profile):
if PROFILE_ALL_ID_SINGLE_QUOTED:
return "'{}'".format(profile)
else:
return profile
else:
return profile


class GenericRunner(object):
def __init__(self, environment, profile, datastream, benchmark_id):
self.environment = environment
Expand Down
15 changes: 7 additions & 8 deletions tests/ssg_test_suite/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import collections
import json

from ssg.constants import OSCAP_PROFILE
from ssg.constants import OSCAP_PROFILE, OSCAP_PROFILE_ALL_ID
from ssg_test_suite import oscap
from ssg_test_suite import xml_operations
from ssg_test_suite import test_env
Expand All @@ -18,8 +18,6 @@
import data


ALL_PROFILE_ID = "(all)"

logging.getLogger(__name__).addHandler(logging.NullHandler())


Expand All @@ -36,7 +34,7 @@ def get_viable_profiles(selected_profiles, datastream, benchmark):
all_profiles_elements = xml_operations.get_all_profiles_in_benchmark(
datastream, benchmark, logging)
all_profiles = [el.attrib["id"] for el in all_profiles_elements]
all_profiles.append(ALL_PROFILE_ID)
all_profiles.append(OSCAP_PROFILE_ALL_ID)

for ds_profile in all_profiles:
if 'ALL' in selected_profiles:
Expand Down Expand Up @@ -122,7 +120,7 @@ def _run_test(self, profile, test_data):

runner_cls = oscap.REMEDIATION_RULE_RUNNERS[self.remediate_using]
runner = runner_cls(
self.test_env, profile, self.datastream, self.benchmark_id,
self.test_env, oscap.process_profile_id(profile), self.datastream, self.benchmark_id,
rule_id, scenario.script, self.dont_clean, self.manual_debug)
if not self._initial_scan_went_ok(runner, rule_id, scenario.context):
return False
Expand Down Expand Up @@ -229,10 +227,10 @@ def _modify_parameters(self, script, params):
params['profiles'] = [self.scenarios_profile]
Comment thread
ggbecker marked this conversation as resolved.

if not params["profiles"]:
params["profiles"].append(ALL_PROFILE_ID)
params["profiles"].append(OSCAP_PROFILE_ALL_ID)
logging.debug(
"Added the {0} profile to the list of available profiles for {1}"
.format(ALL_PROFILE_ID, script))
.format(OSCAP_PROFILE_ALL_ID, script))
return params

def _parse_parameters(self, script):
Expand Down Expand Up @@ -343,7 +341,8 @@ def perform_rule_check(options):
checker.scenarios_profile = options.scenarios_profile
# check if target is a complete profile ID, if not prepend profile prefix
if (checker.scenarios_profile is not None and
not checker.scenarios_profile.startswith(OSCAP_PROFILE)):
not checker.scenarios_profile.startswith(OSCAP_PROFILE) and
not oscap.is_virtual_oscap_profile(checker.scenarios_profile)):
checker.scenarios_profile = OSCAP_PROFILE+options.scenarios_profile

checker.test_target(options.target)
1 change: 0 additions & 1 deletion tests/test_suite.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ def parse_args():
" Variable selections will be done according "
"to this profile.")


parser_combined = subparsers.add_parser("combined",
help=("Tests all rules in a profile evaluating them "
"against their test scenarios."),
Expand Down