From 0254d6b4c20ef147d6fc61b938b65e48d17946a2 Mon Sep 17 00:00:00 2001 From: Cesar Gray Blanco Date: Wed, 19 Mar 2025 14:20:39 -0700 Subject: [PATCH] add checks for empty token retrieval --- .../helper/_ociartifactoperations.py | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/acrcssc/azext_acrcssc/helper/_ociartifactoperations.py b/src/acrcssc/azext_acrcssc/helper/_ociartifactoperations.py index c640968a3f3..25ff69780fc 100644 --- a/src/acrcssc/azext_acrcssc/helper/_ociartifactoperations.py +++ b/src/acrcssc/azext_acrcssc/helper/_ociartifactoperations.py @@ -83,9 +83,8 @@ def get_oci_artifact_continuous_patch(cmd, registry): oci_target_name = f"{CSSC_WORKFLOW_POLICY_REPOSITORY}/{CONTINUOUSPATCH_OCI_ARTIFACT_CONFIG}:{CONTINUOUSPATCH_OCI_ARTIFACT_CONFIG_TAG_V1}" - oci_artifacts = oras_client.pull( - target=oci_target_name, - stream=True) + oci_artifacts = oras_client.pull(target=oci_target_name, + stream=True) trigger_task = get_task(cmd, registry, CONTINUOUSPATCH_TASK_SCANREGISTRY_NAME) file_name = oci_artifacts[0] config = ContinuousPatchConfig().from_file(file_name, trigger_task) @@ -134,7 +133,6 @@ def _oras_client(registry): return client -# Need to check on this method once, if there's alternative to this def _get_acr_token(registry_name, subscription): logger.debug(f"Using CLI user credentials to log into {registry_name}") acr_login_with_token_cmd = [ @@ -148,21 +146,23 @@ def _get_acr_token(registry_name, subscription): ] try: - proc = subprocess.Popen( + result = subprocess.run( acr_login_with_token_cmd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - token = proc.stdout.read().strip().decode("utf-8") - # this suppresses the 'login' warning from the ACR request, if we need the error and does not come from the exception we can take it from here - # error_stderr=proc.stderr.read() - except subprocess.CalledProcessError as error: - unauthorized = ( - error.stderr - and (" 401" in error.stderr or "unauthorized" in error.stderr) - ) or ( - error.stdout - and (" 401" in error.stdout or "unauthorized" in error.stdout) + stderr=subprocess.PIPE, + text=True, # Ensures output is returned as a string + check=True # Raises CalledProcessError for non-zero exit codes ) + token = result.stdout.strip() + if not token or token == "": + logger.debug("Failed to retrieve ACR token: Token is empty.") + raise AzCLIError("Failed to retrieve ACR token. The token is empty.") + return token + except subprocess.CalledProcessError as error: + logger.debug(f"Error while retrieving ACR token: {error.stderr.strip()}") + + unauthorized = (error.stderr + and (" 401" in error.stderr or "unauthorized" in error.stderr)) if unauthorized: # As we shell out the the subprocess, I think checking for these @@ -175,7 +175,7 @@ def _get_acr_token(registry_name, subscription): " artifact store." ) from error - return token + raise AzCLIError(f"Failed to retrieve ACR token: {error.stderr.strip()}") from error class ContinuousPatchConfig: