From ce25329b9b127b99084857775d8e749108206edd Mon Sep 17 00:00:00 2001 From: Bin Xia Date: Mon, 11 Jul 2022 13:55:31 +0000 Subject: [PATCH] {AKS} Refine tests for azurekeyvaultkms * Add test to rotate key for public cluster + public/private key vault * Add test to create private cluster with private key vault, and rotate key --- .../tests/latest/test_aks_commands.py | 198 +++++++++++++++++- 1 file changed, 188 insertions(+), 10 deletions(-) diff --git a/src/aks-preview/azext_aks_preview/tests/latest/test_aks_commands.py b/src/aks-preview/azext_aks_preview/tests/latest/test_aks_commands.py index 2b9dae28350..82a780c5df4 100644 --- a/src/aks-preview/azext_aks_preview/tests/latest/test_aks_commands.py +++ b/src/aks-preview/azext_aks_preview/tests/latest/test_aks_commands.py @@ -3975,7 +3975,7 @@ def test_aks_create_with_network_plugin_none(self, resource_group, resource_grou @live_only() @AllowLargeResponse() @AKSCustomResourceGroupPreparer(random_name_length=17, name_prefix='clitest', location='centraluseuap') - def test_aks_create_with_azurekeyvaultkms(self, resource_group, resource_group_location): + def test_aks_create_with_azurekeyvaultkms_public_key_vault(self, resource_group, resource_group_location): aks_name = self.create_random_name('cliakstest', 16) kv_name = self.create_random_name('cliakstestkv', 16) identity_name = self.create_random_name('cliakstestidentity', 24) @@ -4009,10 +4009,10 @@ def test_aks_create_with_azurekeyvaultkms(self, resource_group, resource_group_l key = self.cmd(create_key, checks=[ self.check('attributes.enabled', True) ]).get_output_in_json() - key_id = key['key']['kid'] - assert key_id is not None + key_id_0 = key['key']['kid'] + assert key_id_0 is not None self.kwargs.update({ - 'key_id': key_id, + 'key_id': key_id_0, }) # assign access policy @@ -4029,7 +4029,27 @@ def test_aks_create_with_azurekeyvaultkms(self, resource_group, resource_group_l self.cmd(create_cmd, checks=[ self.check('provisioningState', 'Succeeded'), self.check('securityProfile.azureKeyVaultKms.enabled', True), - self.check('securityProfile.azureKeyVaultKms.keyId', key_id) + self.check('securityProfile.azureKeyVaultKms.keyId', key_id_0) + ]) + + key = self.cmd(create_key, checks=[ + self.check('attributes.enabled', True) + ]).get_output_in_json() + key_id_1 = key['key']['kid'] + assert key_id_1 is not None + self.kwargs.update({ + 'key_id': key_id_1, + }) + + # Rotate key + update_cmd = 'aks update --resource-group={resource_group} --name={name} ' \ + '--enable-azure-keyvault-kms --azure-keyvault-kms-key-id={key_id} ' \ + '--aks-custom-headers AKSHTTPCustomFeatures=Microsoft.ContainerService/AzureKeyVaultKmsPreview ' \ + '-o json' + self.cmd(update_cmd, checks=[ + self.check('provisioningState', 'Succeeded'), + self.check('securityProfile.azureKeyVaultKms.enabled', True), + self.check('securityProfile.azureKeyVaultKms.keyId', key_id_1) ]) # delete @@ -4041,7 +4061,7 @@ def test_aks_create_with_azurekeyvaultkms(self, resource_group, resource_group_l @live_only() @AllowLargeResponse() @AKSCustomResourceGroupPreparer(random_name_length=17, name_prefix='clitest', location='centraluseuap') - def test_aks_update_with_azurekeyvaultkms(self, resource_group, resource_group_location): + def test_aks_update_with_azurekeyvaultkms_public_key_vault(self, resource_group, resource_group_location): aks_name = self.create_random_name('cliakstest', 16) kv_name = self.create_random_name('cliakstestkv', 16) identity_name = self.create_random_name('cliakstestidentity', 24) @@ -4152,10 +4172,10 @@ def test_aks_create_with_azurekeyvaultkms_private_key_vault(self, resource_group key = self.cmd(create_key, checks=[ self.check('attributes.enabled', True) ]).get_output_in_json() - key_id = key['key']['kid'] - assert key_id is not None + key_id_0 = key['key']['kid'] + assert key_id_0 is not None self.kwargs.update({ - 'key_id': key_id, + 'key_id': key_id_0, }) # assign access policy @@ -4186,7 +4206,42 @@ def test_aks_create_with_azurekeyvaultkms_private_key_vault(self, resource_group self.cmd(create_cmd, checks=[ self.check('provisioningState', 'Succeeded'), self.check('securityProfile.azureKeyVaultKms.enabled', True), - self.check('securityProfile.azureKeyVaultKms.keyId', key_id), + self.check('securityProfile.azureKeyVaultKms.keyId', key_id_0), + self.check('securityProfile.azureKeyVaultKms.keyVaultNetworkAccess', "Private"), + self.check('securityProfile.azureKeyVaultKms.keyVaultResourceId', kv_resource_id) + ]) + + # enable public network access + enable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Enabled" -o json' + kv = self.cmd(enable_public_network_access, checks=[ + self.check('properties.provisioningState', 'Succeeded') + ]).get_output_in_json() + + key = self.cmd(create_key, checks=[ + self.check('attributes.enabled', True) + ]).get_output_in_json() + key_id_1 = key['key']['kid'] + assert key_id_1 is not None + self.kwargs.update({ + 'key_id': key_id_1, + }) + + # disable public network access + disable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Disabled" -o json' + kv = self.cmd(disable_public_network_access, checks=[ + self.check('properties.provisioningState', 'Succeeded') + ]).get_output_in_json() + + # Rotate key + update_cmd = 'aks update --resource-group={resource_group} --name={name} ' \ + '--enable-azure-keyvault-kms --azure-keyvault-kms-key-id={key_id} ' \ + '--azure-keyvault-kms-key-vault-network-access=Private --azure-keyvault-kms-key-vault-resource-id {kv_resource_id} ' \ + '--aks-custom-headers AKSHTTPCustomFeatures=Microsoft.ContainerService/AzureKeyVaultKmsPreview ' \ + '-o json' + self.cmd(update_cmd, checks=[ + self.check('provisioningState', 'Succeeded'), + self.check('securityProfile.azureKeyVaultKms.enabled', True), + self.check('securityProfile.azureKeyVaultKms.keyId', key_id_1), self.check('securityProfile.azureKeyVaultKms.keyVaultNetworkAccess', "Private"), self.check('securityProfile.azureKeyVaultKms.keyVaultResourceId', kv_resource_id) ]) @@ -4291,6 +4346,129 @@ def test_aks_update_with_azurekeyvaultkms_private_key_vault(self, resource_group self.is_empty(), ]) + @live_only() + @AllowLargeResponse() + @AKSCustomResourceGroupPreparer(random_name_length=17, name_prefix='clitest', location='centraluseuap', preserve_default_location=True) + def test_aks_create_with_azurekeyvaultkms_private_cluster_v1_private_key_vault(self, resource_group, resource_group_location): + aks_name = self.create_random_name('cliakstest', 16) + kv_name = self.create_random_name('cliakstestkv', 16) + identity_name = self.create_random_name('cliakstestidentity', 24) + self.kwargs.update({ + 'resource_group': resource_group, + 'name': aks_name, + "kv_name": kv_name, + "identity_name": identity_name, + 'ssh_key_value': self.generate_ssh_keys() + }) + + # create user-assigned identity + create_identity = 'identity create --resource-group={resource_group} --name={identity_name} -o json' + identity = self.cmd(create_identity).get_output_in_json() + identity_id = identity['id'] + identity_object_id = identity['principalId'] + assert identity_id is not None + assert identity_object_id is not None + self.kwargs.update({ + 'identity_id': identity_id, + 'identity_object_id': identity_object_id, + }) + + # create key vault and key + create_keyvault = 'keyvault create --resource-group={resource_group} --name={kv_name} -o json' + kv = self.cmd(create_keyvault, checks=[ + self.check('properties.provisioningState', 'Succeeded') + ]).get_output_in_json() + kv_resource_id = kv['id'] + assert kv_resource_id is not None + self.kwargs.update({ + 'kv_resource_id': kv_resource_id, + }) + + create_key = 'keyvault key create -n kms --vault-name {kv_name} -o json' + key = self.cmd(create_key, checks=[ + self.check('attributes.enabled', True) + ]).get_output_in_json() + key_id_0 = key['key']['kid'] + assert key_id_0 is not None + self.kwargs.update({ + 'key_id': key_id_0, + }) + + # assign access policy + set_policy = 'keyvault set-policy --resource-group={resource_group} --name={kv_name} ' \ + '--object-id {identity_object_id} --key-permissions encrypt decrypt -o json' + policy = self.cmd(set_policy, checks=[ + self.check('properties.provisioningState', 'Succeeded') + ]).get_output_in_json() + + # allow the identity approve private endpoint connection (Microsoft.KeyVault/vaults/privateEndpointConnectionsApproval/action) + create_role_assignment = 'role assignment create --role f25e0fa2-a7c8-4377-a976-54943a77a395 ' \ + '--assignee-object-id {identity_object_id} --assignee-principal-type "ServicePrincipal" ' \ + '--scope {kv_resource_id}' + role_assignment = self.cmd(create_role_assignment).get_output_in_json() + + # disable public network access + disable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Disabled" -o json' + kv = self.cmd(disable_public_network_access, checks=[ + self.check('properties.provisioningState', 'Succeeded') + ]).get_output_in_json() + + create_cmd = 'aks create --resource-group={resource_group} --name={name} ' \ + '--assign-identity {identity_id} --enable-private-cluster ' \ + '--enable-azure-keyvault-kms --azure-keyvault-kms-key-id={key_id} ' \ + '--azure-keyvault-kms-key-vault-network-access=Private --azure-keyvault-kms-key-vault-resource-id {kv_resource_id} ' \ + '--aks-custom-headers AKSHTTPCustomFeatures=Microsoft.ContainerService/AzureKeyVaultKmsPreview ' \ + '--ssh-key-value={ssh_key_value} -o json' + self.cmd(create_cmd, checks=[ + self.check('provisioningState', 'Succeeded'), + self.check('apiServerAccessProfile.enablePrivateCluster', 'True'), + self.check('securityProfile.azureKeyVaultKms.enabled', True), + self.check('securityProfile.azureKeyVaultKms.keyId', key_id_0), + self.check('securityProfile.azureKeyVaultKms.keyVaultNetworkAccess', "Private"), + self.check('securityProfile.azureKeyVaultKms.keyVaultResourceId', kv_resource_id) + ]) + + # enable public network access + enable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Enabled" -o json' + kv = self.cmd(enable_public_network_access, checks=[ + self.check('properties.provisioningState', 'Succeeded') + ]).get_output_in_json() + + key = self.cmd(create_key, checks=[ + self.check('attributes.enabled', True) + ]).get_output_in_json() + key_id_1 = key['key']['kid'] + assert key_id_1 is not None + self.kwargs.update({ + 'key_id': key_id_1, + }) + + # disable public network access + disable_public_network_access = 'keyvault update --resource-group={resource_group} --name={kv_name} --public-network-access "Disabled" -o json' + kv = self.cmd(disable_public_network_access, checks=[ + self.check('properties.provisioningState', 'Succeeded') + ]).get_output_in_json() + + # Rotate key + update_cmd = 'aks update --resource-group={resource_group} --name={name} ' \ + '--enable-azure-keyvault-kms --azure-keyvault-kms-key-id={key_id} ' \ + '--azure-keyvault-kms-key-vault-network-access=Private --azure-keyvault-kms-key-vault-resource-id {kv_resource_id} ' \ + '--aks-custom-headers AKSHTTPCustomFeatures=Microsoft.ContainerService/AzureKeyVaultKmsPreview ' \ + '-o json' + self.cmd(update_cmd, checks=[ + self.check('provisioningState', 'Succeeded'), + self.check('securityProfile.azureKeyVaultKms.enabled', True), + self.check('securityProfile.azureKeyVaultKms.keyId', key_id_1), + self.check('securityProfile.azureKeyVaultKms.keyVaultNetworkAccess', "Private"), + self.check('securityProfile.azureKeyVaultKms.keyVaultResourceId', kv_resource_id) + ]) + + # delete + cmd = 'aks delete --resource-group={resource_group} --name={name} --yes --no-wait' + self.cmd(cmd, checks=[ + self.is_empty(), + ]) + @live_only() @AllowLargeResponse() @AKSCustomResourceGroupPreparer(random_name_length=17, name_prefix='clitest', location='centraluseuap')