From 4c128b10fb392746b326a8a7dfbe448857237574 Mon Sep 17 00:00:00 2001 From: Mahati Chamarthy Date: Tue, 5 May 2026 14:24:21 +0100 Subject: [PATCH 1/4] confcom: Remove docker dependency when tar is supplied --- src/confcom/azext_confcom/custom.py | 11 +- src/confcom/azext_confcom/security_policy.py | 151 +++++++++++++------ 2 files changed, 117 insertions(+), 45 deletions(-) diff --git a/src/confcom/azext_confcom/custom.py b/src/confcom/azext_confcom/custom.py index 7f1885ae103..0fcc48d3081 100644 --- a/src/confcom/azext_confcom/custom.py +++ b/src/confcom/azext_confcom/custom.py @@ -131,6 +131,7 @@ def acipolicygen_confcom( disable_stdio=(not stdio_enabled), exclude_default_fragments=exclude_default_fragments, platform=platform, + tar_mapping=tar_mapping, ) elif arm_template: container_group_policies = security_policy.load_policy_from_arm_template_file( @@ -144,11 +145,12 @@ def acipolicygen_confcom( rego_imports=fragments_list, exclude_default_fragments=exclude_default_fragments, platform=platform, + tar_mapping=tar_mapping, ) elif image_name: container_group_policies = security_policy.load_policy_from_image_name( image_name, debug_mode=debug_mode, disable_stdio=(not stdio_enabled), - platform=platform, + platform=platform, tar_mapping=tar_mapping, ) elif virtual_node_yaml_path: container_group_policies = security_policy.load_policy_from_virtual_node_yaml_file( @@ -161,6 +163,7 @@ def acipolicygen_confcom( exclude_default_fragments=exclude_default_fragments, infrastructure_svn=infrastructure_svn, platform=platform, + tar_mapping=tar_mapping, ) elif container_definitions: container_group_policies = AciPolicy( @@ -325,14 +328,16 @@ def acifragmentgen_confcom( if image_name: policy = security_policy.load_policy_from_image_name( - image_name, debug_mode=debug_mode, disable_stdio=(not stdio_enabled) + image_name, debug_mode=debug_mode, disable_stdio=(not stdio_enabled), + tar_mapping=tar_mapping, ) elif input_path: # this is using --input if not tar_mapping: tar_mapping = os_util.load_tar_mapping_from_config_file(input_path) policy = security_policy.load_policy_from_json_file( - input_path, debug_mode=debug_mode, disable_stdio=(not stdio_enabled) + input_path, debug_mode=debug_mode, disable_stdio=(not stdio_enabled), + tar_mapping=tar_mapping, ) elif container_definitions: policy = AciPolicy( diff --git a/src/confcom/azext_confcom/security_policy.py b/src/confcom/azext_confcom/security_policy.py index 58bd0673842..5af00a5eb92 100644 --- a/src/confcom/azext_confcom/security_policy.py +++ b/src/confcom/azext_confcom/security_policy.py @@ -679,59 +679,117 @@ def set_images(self, images: List[ContainerImage]) -> None: self._images = images -def validate_image_platform(image_name: str, platform: str) -> None: - """Validate that the image's platform matches --platform. +def _get_platform_from_tar(tar_location: str, image_name: str) -> Optional[str]: + """Extract the platform (os/architecture) from a tar file's config blob. - Checks the local Docker image first, then attempts to pull with the - specified platform if not found locally. Verifies the image's - Os/Architecture attrs match the requested platform. + Supports both Docker save format and OCI image layout. + Returns a string like "linux/amd64" or None if platform cannot be determined. """ + import tarfile as tarfile_module + + def _platform_from_config(tar, config_path): + data = json.loads(tar.extractfile(config_path).read()) + return f"{data['os']}/{data['architecture']}" + + try: + with tarfile_module.open(tar_location) as tar: + # Try Docker save format: manifest.json with RepoTags + try: + manifest = json.loads(tar.extractfile("manifest.json").read()) + for entry in manifest: + if image_name in (entry.get("RepoTags") or []): + return _platform_from_config(tar, entry["Config"]) + except (KeyError, TypeError): + pass + + # Try OCI layout: index.json → manifest blob → config blob + try: + oci_name = f"docker.io/library/{image_name}" if "/" not in image_name else image_name + index = json.loads(tar.extractfile("index.json").read()) + for m in index.get("manifests", []): + ann_name = (m.get("annotations") or {}).get("io.containerd.image.name", "") + if ann_name and ann_name != oci_name: + continue + media_type = m.get("mediaType", "") + if media_type not in [ + "application/vnd.docker.distribution.manifest.v2+json", + "application/vnd.oci.image.manifest.v1+json", + ]: + continue + algo, digest = m["digest"].split(":", 1) + nested = json.loads(tar.extractfile(f"blobs/{algo}/{digest}").read()) + c_algo, c_digest = nested["config"]["digest"].split(":", 1) + return _platform_from_config(tar, f"blobs/{c_algo}/{c_digest}") + except (KeyError, TypeError): + pass + except (OSError, tarfile_module.TarError) as e: + logger.warning("Could not read platform from tar file: %s", e) + + return None + + +def _get_docker_image(image_name: str, platform: str): + """Get a Docker image locally or by pulling. Returns the image or calls eprint on failure.""" import docker as docker_module try: client = docker_module.from_env() except docker_module.errors.DockerException: eprint("Docker is not running. Please start Docker.") - return - image = None - - # Try local image first + # Try local first try: - image = client.images.get(image_name) + return client.images.get(image_name) except (docker_module.errors.ImageNotFound, docker_module.errors.NullResource): pass - # If not local, try pulling with the specified platform - if image is None: - try: - image = client.images.pull(image_name, platform=platform) - except (docker_module.errors.ImageNotFound, docker_module.errors.NotFound): + # Pull with specified platform + try: + return client.images.pull(image_name, platform=platform) + except (docker_module.errors.ImageNotFound, docker_module.errors.NotFound): + eprint( + f'Image "{image_name}" is not found. ' + f'Please check the image name and repository.' + ) + except docker_module.errors.APIError as e: + error_msg = str(e).lower() + if "not supported" in error_msg or "no matching manifest" in error_msg: eprint( - f'Image "{image_name}" is not found. ' - f'Please check the image name and repository.' + f'Image "{image_name}" could not be pulled for platform "{platform}". ' + f'Docker Desktop must be in the correct container mode ' + f'(Linux containers for linux/amd64, ' + f'Windows containers for windows/amd64).' + ) + else: + eprint( + f'Image "{image_name}" could not be pulled for platform ' + f'"{platform}": {e}' ) - except docker_module.errors.APIError as e: - error_msg = str(e).lower() - if "not supported" in error_msg or "no matching manifest" in error_msg: - eprint( - f'Image "{image_name}" could not be pulled for platform "{platform}". ' - f'Docker Desktop must be in the correct container mode ' - f'(Linux containers for linux/amd64, ' - f'Windows containers for windows/amd64).' - ) - else: - eprint( - f'Image "{image_name}" could not be pulled for platform ' - f'"{platform}": {e}' - ) - if image is None: - eprint( - f'Image "{image_name}" could not be retrieved for platform validation.' - ) - return + eprint(f'Image "{image_name}" could not be retrieved for platform validation.') + + +def _detect_image_platform(image_name: str, platform: str, tar_mapping=None) -> str: + """Detect the platform of an image from tar or Docker. Returns os/arch string.""" + if tar_mapping: + tar_location = get_tar_location_from_mapping(tar_mapping, image_name) + if tar_location: + detected = _get_platform_from_tar(tar_location, image_name) + if detected: + return detected - detected = f"{image.attrs.get('Os')}/{image.attrs.get('Architecture')}" + image = _get_docker_image(image_name, platform) + return f"{image.attrs.get('Os')}/{image.attrs.get('Architecture')}" + + +def validate_image_platform(image_name: str, platform: str, tar_mapping=None) -> None: + """Validate that the image's platform matches --platform. + + When tar_mapping is provided and the image is found in the tar, + platform is read directly from the tar config blob without Docker. + Otherwise, checks the local Docker image first, then attempts to pull + with the specified platform if not found locally. + """ + detected = _detect_image_platform(image_name, platform, tar_mapping) if detected != platform: eprint( f'Image "{image_name}" has platform "{detected}", ' @@ -752,6 +810,7 @@ def load_policy_from_arm_template_str( fragment_contents: Any = None, exclude_default_fragments: bool = False, platform: str = "linux/amd64", + tar_mapping=None, ) -> List[AciPolicy]: """Function that converts ARM template string to an ACI Policy""" input_arm_json = os_util.load_json_from_str(template_data) @@ -902,7 +961,7 @@ def load_policy_from_arm_template_str( # Resolve ARM parameters/variables to get the real image name for validation resolved_image = find_value_in_params_and_vars(all_params, all_vars, image_name) - validate_image_platform(resolved_image, platform) + validate_image_platform(resolved_image, platform, tar_mapping=tar_mapping) exec_processes = [] extract_probe(exec_processes, image_properties, config.ACI_FIELD_CONTAINERS_READINESS_PROBE) @@ -963,6 +1022,7 @@ def load_policy_from_arm_template_file( fragment_contents: list = None, exclude_default_fragments: bool = False, platform: str = "linux/amd64", + tar_mapping=None, ) -> List[AciPolicy]: """Utility function: generate policy object from given arm template and parameter file paths""" input_arm_json = os_util.load_str_from_file(template_path) @@ -981,12 +1041,13 @@ def load_policy_from_arm_template_file( fragment_contents=fragment_contents, exclude_default_fragments=exclude_default_fragments, platform=platform, + tar_mapping=tar_mapping, ) def load_policy_from_image_name( image_names: Union[List[str], str], debug_mode: bool = False, disable_stdio: bool = False, - platform: str = "linux/amd64", + platform: str = "linux/amd64", tar_mapping=None, ) -> AciPolicy: # can either take a list of image names or a single image name if isinstance(image_names, str): @@ -994,7 +1055,7 @@ def load_policy_from_image_name( containers = [] for image_name in image_names: - validate_image_platform(image_name, platform) + validate_image_platform(image_name, platform, tar_mapping=tar_mapping) container = {} # assign just the fields that are expected @@ -1031,6 +1092,7 @@ def load_policy_from_json_file( infrastructure_svn: str = None, exclude_default_fragments: bool = False, platform: str = "linux/amd64", + tar_mapping=None, ) -> AciPolicy: json_content = os_util.load_str_from_file(data) return load_policy_from_json( @@ -1040,6 +1102,7 @@ def load_policy_from_json_file( infrastructure_svn=infrastructure_svn, exclude_default_fragments=exclude_default_fragments, platform=platform, + tar_mapping=tar_mapping, ) @@ -1050,6 +1113,7 @@ def load_policy_from_json( infrastructure_svn: str = None, exclude_default_fragments: bool = False, platform: str = "linux/amd64", + tar_mapping=None, ) -> AciPolicy: output_containers = [] # 1) Parse incoming string as JSON @@ -1113,7 +1177,7 @@ def load_policy_from_json( f'Field ["{config.ACI_FIELD_TEMPLATE_IMAGE}"] is empty or cannot be found' ) - validate_image_platform(image_name, platform) + validate_image_platform(image_name, platform, tar_mapping=tar_mapping) container_name = case_insensitive_dict_get( container, config.ACI_FIELD_CONTAINERS_NAME @@ -1218,6 +1282,7 @@ def load_policy_from_virtual_node_yaml_file( fragment_contents: list = None, infrastructure_svn: str = None, platform: str = "linux/amd64", + tar_mapping=None, ) -> List[AciPolicy]: yaml_contents_str = os_util.load_str_from_file(virtual_node_yaml_path) return load_policy_from_virtual_node_yaml_str( @@ -1231,6 +1296,7 @@ def load_policy_from_virtual_node_yaml_file( fragment_contents=fragment_contents, infrastructure_svn=infrastructure_svn, platform=platform, + tar_mapping=tar_mapping, ) @@ -1246,6 +1312,7 @@ def load_policy_from_virtual_node_yaml_str( fragment_contents: Any = None, infrastructure_svn: str = None, platform: str = "linux/amd64", + tar_mapping=None, ) -> List[AciPolicy]: """ Load a virtual node yaml file and generate a policy object @@ -1328,7 +1395,7 @@ def load_policy_from_virtual_node_yaml_str( if not image: eprint("Container does not have an image field") - validate_image_platform(image, platform) + validate_image_platform(image, platform, tar_mapping=tar_mapping) # env vars envs = process_env_vars_from_yaml( From d49cb90bf2b2b10e283f5bc9af634970a20f7c6b Mon Sep 17 00:00:00 2001 From: Mahati Chamarthy Date: Tue, 5 May 2026 16:12:29 +0100 Subject: [PATCH 2/4] confcom: Remove tar check duplicacy --- src/confcom/azext_confcom/os_util.py | 2 + src/confcom/azext_confcom/security_policy.py | 142 +++++++------------ 2 files changed, 51 insertions(+), 93 deletions(-) diff --git a/src/confcom/azext_confcom/os_util.py b/src/confcom/azext_confcom/os_util.py index 4d838be551c..92376b265a7 100644 --- a/src/confcom/azext_confcom/os_util.py +++ b/src/confcom/azext_confcom/os_util.py @@ -204,6 +204,7 @@ def map_image_from_tar_oci_layout_v1(image_name: str, tar: TarFile, tar_location image_info_raw = load_json_from_str(image_info_raw_bytes) image_info = image_info_raw.get("config") image_info["Architecture"] = image_info_raw.get("architecture") + image_info["Os"] = image_info_raw.get("os") return image_info eprint(f"Image '{image_name}' is not found in '{tar_location}'") @@ -238,6 +239,7 @@ def map_image_from_tar(image_name: str, tar: TarFile, _tar_location: str): image_info = image_info_raw.get("config") # importing the constant from config.py gives a circular dependency error image_info["Architecture"] = image_info_raw.get("architecture") + image_info["Os"] = image_info_raw.get("os") return image_info diff --git a/src/confcom/azext_confcom/security_policy.py b/src/confcom/azext_confcom/security_policy.py index 5af00a5eb92..00f4ac968e7 100644 --- a/src/confcom/azext_confcom/security_policy.py +++ b/src/confcom/azext_confcom/security_policy.py @@ -494,6 +494,18 @@ def populate_policy_content_for_all_images( logger.info("Processing image: %s", image_name) image_info, tar = get_image_info(progress, message_queue, tar_mapping, image) + # validate image platform matches --platform + if image_info and self._platform: + detected_os = image_info.get("Os") or image_info.get("platform", "").split("/")[0] + detected_arch = image_info.get("Architecture") or image_info.get("platform", "").split("/")[-1] + if detected_os and detected_arch: + detected = f"{detected_os}/{detected_arch}" + if detected != self._platform: + eprint( + f'Image "{image_name}" has platform "{detected}", ' + f'which does not match the specified platform "{self._platform}".' + ) + # verify and populate the working directory property if not image.get_working_dir() and image_info: workingDir = image_info.get("WorkingDir") @@ -679,57 +691,19 @@ def set_images(self, images: List[ContainerImage]) -> None: self._images = images -def _get_platform_from_tar(tar_location: str, image_name: str) -> Optional[str]: - """Extract the platform (os/architecture) from a tar file's config blob. +def validate_image_platform(image_name: str, platform: str, tar_mapping=None) -> None: + """Validate that the image's platform matches --platform. - Supports both Docker save format and OCI image layout. - Returns a string like "linux/amd64" or None if platform cannot be determined. + When tar_mapping is provided and the image is found in the tar, + validation is skipped here — it will be performed later in + populate_policy_content_for_all_images after the tar config is read. + Without tar, checks via Docker (local get or pull). """ - import tarfile as tarfile_module - - def _platform_from_config(tar, config_path): - data = json.loads(tar.extractfile(config_path).read()) - return f"{data['os']}/{data['architecture']}" + if tar_mapping: + tar_location = get_tar_location_from_mapping(tar_mapping, image_name) + if tar_location: + return - try: - with tarfile_module.open(tar_location) as tar: - # Try Docker save format: manifest.json with RepoTags - try: - manifest = json.loads(tar.extractfile("manifest.json").read()) - for entry in manifest: - if image_name in (entry.get("RepoTags") or []): - return _platform_from_config(tar, entry["Config"]) - except (KeyError, TypeError): - pass - - # Try OCI layout: index.json → manifest blob → config blob - try: - oci_name = f"docker.io/library/{image_name}" if "/" not in image_name else image_name - index = json.loads(tar.extractfile("index.json").read()) - for m in index.get("manifests", []): - ann_name = (m.get("annotations") or {}).get("io.containerd.image.name", "") - if ann_name and ann_name != oci_name: - continue - media_type = m.get("mediaType", "") - if media_type not in [ - "application/vnd.docker.distribution.manifest.v2+json", - "application/vnd.oci.image.manifest.v1+json", - ]: - continue - algo, digest = m["digest"].split(":", 1) - nested = json.loads(tar.extractfile(f"blobs/{algo}/{digest}").read()) - c_algo, c_digest = nested["config"]["digest"].split(":", 1) - return _platform_from_config(tar, f"blobs/{c_algo}/{c_digest}") - except (KeyError, TypeError): - pass - except (OSError, tarfile_module.TarError) as e: - logger.warning("Could not read platform from tar file: %s", e) - - return None - - -def _get_docker_image(image_name: str, platform: str): - """Get a Docker image locally or by pulling. Returns the image or calls eprint on failure.""" import docker as docker_module try: client = docker_module.from_env() @@ -738,58 +712,39 @@ def _get_docker_image(image_name: str, platform: str): # Try local first try: - return client.images.get(image_name) + image = client.images.get(image_name) except (docker_module.errors.ImageNotFound, docker_module.errors.NullResource): - pass + image = None # Pull with specified platform - try: - return client.images.pull(image_name, platform=platform) - except (docker_module.errors.ImageNotFound, docker_module.errors.NotFound): - eprint( - f'Image "{image_name}" is not found. ' - f'Please check the image name and repository.' - ) - except docker_module.errors.APIError as e: - error_msg = str(e).lower() - if "not supported" in error_msg or "no matching manifest" in error_msg: - eprint( - f'Image "{image_name}" could not be pulled for platform "{platform}". ' - f'Docker Desktop must be in the correct container mode ' - f'(Linux containers for linux/amd64, ' - f'Windows containers for windows/amd64).' - ) - else: + if image is None: + try: + image = client.images.pull(image_name, platform=platform) + except (docker_module.errors.ImageNotFound, docker_module.errors.NotFound): eprint( - f'Image "{image_name}" could not be pulled for platform ' - f'"{platform}": {e}' + f'Image "{image_name}" is not found. ' + f'Please check the image name and repository.' ) + except docker_module.errors.APIError as e: + error_msg = str(e).lower() + if "not supported" in error_msg or "no matching manifest" in error_msg: + eprint( + f'Image "{image_name}" could not be pulled for platform "{platform}". ' + f'Docker Desktop must be in the correct container mode ' + f'(Linux containers for linux/amd64, ' + f'Windows containers for windows/amd64).' + ) + else: + eprint( + f'Image "{image_name}" could not be pulled for platform ' + f'"{platform}": {e}' + ) - eprint(f'Image "{image_name}" could not be retrieved for platform validation.') - - -def _detect_image_platform(image_name: str, platform: str, tar_mapping=None) -> str: - """Detect the platform of an image from tar or Docker. Returns os/arch string.""" - if tar_mapping: - tar_location = get_tar_location_from_mapping(tar_mapping, image_name) - if tar_location: - detected = _get_platform_from_tar(tar_location, image_name) - if detected: - return detected - - image = _get_docker_image(image_name, platform) - return f"{image.attrs.get('Os')}/{image.attrs.get('Architecture')}" - - -def validate_image_platform(image_name: str, platform: str, tar_mapping=None) -> None: - """Validate that the image's platform matches --platform. + if image is None: + eprint(f'Image "{image_name}" could not be retrieved for platform validation.') + return - When tar_mapping is provided and the image is found in the tar, - platform is read directly from the tar config blob without Docker. - Otherwise, checks the local Docker image first, then attempts to pull - with the specified platform if not found locally. - """ - detected = _detect_image_platform(image_name, platform, tar_mapping) + detected = f"{image.attrs.get('Os')}/{image.attrs.get('Architecture')}" if detected != platform: eprint( f'Image "{image_name}" has platform "{detected}", ' @@ -797,6 +752,7 @@ def validate_image_platform(image_name: str, platform: str, tar_mapping=None) -> ) + # pylint: disable=R0914, def load_policy_from_arm_template_str( template_data: str, From 81a6f6b76964ec479f1ab84c434cfd49a6b2e987 Mon Sep 17 00:00:00 2001 From: Tingmao Wang Date: Wed, 13 May 2026 08:12:38 +0000 Subject: [PATCH 3/4] [confcom]: v2.0.1 --- src/confcom/HISTORY.rst | 4 ++++ src/confcom/setup.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/confcom/HISTORY.rst b/src/confcom/HISTORY.rst index ed59f115375..bd62481b6cb 100644 --- a/src/confcom/HISTORY.rst +++ b/src/confcom/HISTORY.rst @@ -3,6 +3,10 @@ Release History =============== +2.0.1 ++++++ +* Fix accidental docker dependency even when using tar + 2.0.0 +++++ * Fix path traversal when generating policies from untrusted image tar files diff --git a/src/confcom/setup.py b/src/confcom/setup.py index 4ecc4aeb1a2..e9f737686ec 100644 --- a/src/confcom/setup.py +++ b/src/confcom/setup.py @@ -19,7 +19,7 @@ logger.warn("Wheel is not available, disabling bdist_wheel hook") -VERSION = "2.0.0" +VERSION = "2.0.1" # The full list of classifiers is available at # https://pypi.python.org/pypi?%3Aaction=list_classifiers From 8b4f4ab19f07f180a618e6eb7aff468af171cf9c Mon Sep 17 00:00:00 2001 From: Tingmao Wang Date: Wed, 13 May 2026 08:33:32 +0000 Subject: [PATCH 4/4] Fix lint --- src/confcom/azext_confcom/security_policy.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/confcom/azext_confcom/security_policy.py b/src/confcom/azext_confcom/security_policy.py index 00f4ac968e7..87408f2a992 100644 --- a/src/confcom/azext_confcom/security_policy.py +++ b/src/confcom/azext_confcom/security_policy.py @@ -752,7 +752,6 @@ def validate_image_platform(image_name: str, platform: str, tar_mapping=None) -> ) - # pylint: disable=R0914, def load_policy_from_arm_template_str( template_data: str,