From 69875f6cf280f3a56fd0a86e75879ff9e3c46c7c Mon Sep 17 00:00:00 2001 From: Charlie Drage Date: Tue, 7 Jun 2016 10:23:12 -0400 Subject: [PATCH] Add a new 'library' for k8s/openshift providers. This is the first commit and first initialization into refactoring the OpenShift provider in order to create a library that is both compatible with OpenShift as well as Kubernetes. With this 'library', the *ONLY* thing that is passed in is an object of the .kube/config configuration. After passing the configuration, you may make API calls such as .create(object) and .delete(object). --- atomicapp/constants.py | 2 + atomicapp/plugin.py | 4 +- atomicapp/providers/kubernetes.py | 387 ++++++++++-------- atomicapp/providers/lib/kubeshift/__init__.py | 0 atomicapp/providers/lib/kubeshift/client.py | 57 +++ .../providers/lib/kubeshift/exceptions.py | 42 ++ atomicapp/providers/lib/kubeshift/kubebase.py | 341 +++++++++++++++ .../lib/{ => kubeshift}/kubeconfig.py | 64 +++ .../providers/lib/kubeshift/kubernetes.py | 171 ++++++++ atomicapp/providers/openshift.py | 2 +- tests/units/nulecule/test_kubeconfig.py | 2 +- .../providers/test_kubernetes_provider.py | 4 - 12 files changed, 889 insertions(+), 187 deletions(-) create mode 100644 atomicapp/providers/lib/kubeshift/__init__.py create mode 100644 atomicapp/providers/lib/kubeshift/client.py create mode 100644 atomicapp/providers/lib/kubeshift/exceptions.py create mode 100644 atomicapp/providers/lib/kubeshift/kubebase.py rename atomicapp/providers/lib/{ => kubeshift}/kubeconfig.py (66%) create mode 100644 atomicapp/providers/lib/kubeshift/kubernetes.py diff --git a/atomicapp/constants.py b/atomicapp/constants.py index f2144aa5..0ab748de 100644 --- a/atomicapp/constants.py +++ b/atomicapp/constants.py @@ -75,6 +75,8 @@ PROVIDER_TLS_VERIFY_KEY = "provider-tlsverify" PROVIDER_CA_KEY = "provider-cafile" +K8S_DEFAULT_API = "http://localhost:8080" + # Persistent Storage Formats PERSISTENT_STORAGE_FORMAT = ["ReadWriteOnce", "ReadOnlyMany", "ReadWriteMany"] diff --git a/atomicapp/plugin.py b/atomicapp/plugin.py index 7234c1f7..2b13b498 100644 --- a/atomicapp/plugin.py +++ b/atomicapp/plugin.py @@ -40,7 +40,9 @@ class Provider(object): dryrun = None container = False config_file = None - __artifacts = None + + # By default, no artifacts are loaded + __artifacts = [] @property def artifacts(self): diff --git a/atomicapp/providers/kubernetes.py b/atomicapp/providers/kubernetes.py index 5981ccfe..92c2ed2e 100644 --- a/atomicapp/providers/kubernetes.py +++ b/atomicapp/providers/kubernetes.py @@ -20,14 +20,21 @@ import anymarkup import logging import os -from string import Template -from atomicapp.constants import (LOGGER_COCKPIT, +from atomicapp.constants import (PROVIDER_AUTH_KEY, + ANSWERS_FILE, + DEFAULT_NAMESPACE, LOGGER_DEFAULT, - PERSISTENT_STORAGE_FORMAT) + PROVIDER_API_KEY, + PROVIDER_CA_KEY, + PROVIDER_TLS_VERIFY_KEY, + LOGGER_COCKPIT, + K8S_DEFAULT_API) from atomicapp.plugin import Provider, ProviderFailedException -from atomicapp.utils import Utils +from atomicapp.providers.lib.kubeshift.kubeconfig import KubeConfig +from atomicapp.providers.lib.kubeshift.client import Client +from atomicapp.utils import Utils cockpit_logger = logging.getLogger(LOGGER_COCKPIT) logger = logging.getLogger(LOGGER_DEFAULT) @@ -38,155 +45,227 @@ class KubernetesProvider(Provider): This class implements deploy, stop and undeploy of an atomicapp on Kubernetes provider. """ + + # Class variables key = "kubernetes" + namespace = DEFAULT_NAMESPACE + k8s_artifacts = {} + + # From the provider configuration config_file = None - kubectl = None - def init(self): - self.namespace = "default" + # Essential provider parameters + provider_api = None + provider_auth = None + provider_tls_verify = None + provider_ca = None - self.k8s_manifests = [] + def init(self): + self.k8s_artifacts = {} logger.debug("Given config: %s", self.config) if self.config.get("namespace"): self.namespace = self.config.get("namespace") logger.info("Using namespace %s", self.namespace) - if self.container: - self.kubectl = self._find_kubectl(Utils.getRoot()) - kube_conf_path = "/etc/kubernetes" - host_kube_conf_path = Utils.get_real_abspath(kube_conf_path) - if not os.path.exists(kube_conf_path) and os.path.exists(host_kube_conf_path): - if self.dryrun: - logger.info("DRY-RUN: link %s from %s" % (kube_conf_path, host_kube_conf_path)) - else: - os.symlink(host_kube_conf_path, kube_conf_path) + + self._process_artifacts() + + if self.dryrun: + return + + ''' + Config_file: + If a config_file has been provided, use the configuration + from the file and load the associated generated file. + If a config_file exists (--provider-config) use that. + + Params: + If any provider specific parameters have been provided, + load the configuration through the answers.conf file + + .kube/config: + If no config file or params are provided by user then try to find and + use a config file at the default location. + + no config at all: + If no .kube/config file can be found then try to connect to the default + unauthenticated http://localhost:8080/api end-point. + ''' + + default_config_loc = os.path.join( + Utils.getRoot(), Utils.getUserHome().strip('/'), '.kube/config') + + if self.config_file: + logger.debug("Provider configuration provided") + self.api = Client(KubeConfig.from_file(self.config_file), "kubernetes") + elif self._check_required_params(): + logger.debug("Generating .kube/config from given parameters") + self.api = Client(self._from_required_params(), "kubernetes") + elif os.path.isfile(default_config_loc): + logger.debug(".kube/config exists, using default configuration file") + self.api = Client(KubeConfig.from_file(default_config_loc), "kubernetes") else: - self.kubectl = self._find_kubectl() + self.config["provider-api"] = K8S_DEFAULT_API + self.api = Client(self._from_required_params(), "kubernetes") + + # Check if the namespace that the app is being deployed to is available + self._check_namespaces() + + def _build_param_dict(self): + # Initialize the values + paramdict = {PROVIDER_API_KEY: self.provider_api, + PROVIDER_AUTH_KEY: self.provider_auth, + PROVIDER_TLS_VERIFY_KEY: self.provider_tls_verify, + PROVIDER_CA_KEY: self.provider_ca} + + # Get values from the loaded answers.conf / passed CLI params + for k in paramdict.keys(): + paramdict[k] = self.config.get(k) + + return paramdict + + def _check_required_params(self, exception=False): + ''' + This checks to see if required parameters associated to the Kubernetes + provider are passed. Only PROVIDER_API_KEY is *required*. Token may be blank. + ''' + + paramdict = self._build_param_dict() + logger.debug("List of parameters passed: %s" % paramdict) + + # Check that the required parameters are passed. If not, error out. + for k in [PROVIDER_API_KEY]: + if paramdict[k] is None: + if exception: + msg = "You need to set %s in %s or pass it as a CLI param" % (k, ANSWERS_FILE) + raise ProviderFailedException(msg) + else: + return False + + return True + + def _from_required_params(self): + ''' + Create a default configuration from passed environment parameters. + ''' + + self._check_required_params(exception=True) + paramdict = self._build_param_dict() + + # Generate the configuration from the paramters + config = KubeConfig().from_params(api=paramdict[PROVIDER_API_KEY], + auth=paramdict[PROVIDER_AUTH_KEY], + ca=paramdict[PROVIDER_CA_KEY], + verify=paramdict[PROVIDER_TLS_VERIFY_KEY]) + return config + + def _check_namespaces(self): + ''' + This function checks to see whether or not the namespaces created in the cluster match the + namespace that is associated and/or provided in the deployed application + ''' + + # Get the namespaces and output the currently used ones + namespace_list = self.api.namespaces() + logger.debug("There are currently %s namespaces in the cluster." % str(len(namespace_list))) + + # Create a namespace list + namespaces = [] + for ns in namespace_list: + namespaces.append(ns["metadata"]["name"]) + + # Output the namespaces and check to see if the one provided exists + logger.debug("Namespaces: %s" % namespaces) + if self.namespace not in namespaces: + msg = "%s namespace does not exist. Please create the namespace and try again." % self.namespace + raise ProviderFailedException(msg) - if not self.dryrun: - if not os.access(self.kubectl, os.X_OK): - raise ProviderFailedException("Command: " + self.kubectl + " not found") + def _process_artifacts(self): + """ + Parse each Kubernetes file and convert said format into an Object for + deployment. + """ + for artifact in self.artifacts: + logger.debug("Processing artifact: %s", artifact) + data = None - # Check if Kubernetes config file is accessible, but only - # if one was provided by the user; config file is optional. - if self.config_file: - self.checkConfigFile() + # Open and parse the artifact data + with open(os.path.join(self.path, artifact), "r") as fp: + data = anymarkup.parse(fp, force_types=None) - def _find_kubectl(self, prefix=""): - """Determine the path to the kubectl program on the host. - 1) Check the config for a provider_cli in the general section - remember to add /host prefix - 2) Search /usr/bin:/usr/local/bin + # Process said artifacts + self._process_artifact_data(artifact, data) - Use the first valid value found + def _process_artifact_data(self, artifact, data): """ + Process the data for an artifact - if self.dryrun: - # Testing env does not have kubectl in it - return "/usr/bin/kubectl" + Args: + artifact (str): Artifact name + data (dict): Artifact data + """ - test_paths = ['/usr/bin/kubectl', '/usr/local/bin/kubectl'] - if self.config.get("provider_cli"): - logger.info("caller gave provider_cli: " + self.config.get("provider_cli")) - test_paths.insert(0, self.config.get("provider_cli")) + # Check if kind exists + if "kind" not in data.keys(): + raise ProviderFailedException( + "Error processing %s artifact. There is no kind" % artifact) - for path in test_paths: - test_path = prefix + path - logger.info("trying kubectl at " + test_path) - kubectl = test_path - if os.access(kubectl, os.X_OK): - logger.info("found kubectl at " + test_path) - return kubectl + # Change to lower case so it's easier to parse + kind = data["kind"].lower() - raise ProviderFailedException("No kubectl found in %s" % ":".join(test_paths)) + if kind not in self.k8s_artifacts.keys(): + self.k8s_artifacts[kind] = [] - def _call(self, cmd): - """Calls given command + # Fail if there is no metadata + if 'metadata' not in data: + raise ProviderFailedException( + "Error processing %s artifact. There is no metadata object" % artifact) - :arg cmd: Command to be called in a form of list - :raises: Exception - """ + # Change to the namespace specified on init() + data['metadata']['namespace'] = self.namespace - if self.dryrun: - logger.info("DRY-RUN: %s", " ".join(cmd)) + if 'labels' not in data['metadata']: + data['metadata']['labels'] = {'namespace': self.namespace} else: - ec, stdout, stderr = Utils.run_cmd(cmd, checkexitcode=True) - return stdout + data['metadata']['labels']['namespace'] = self.namespace - def process_k8s_artifacts(self): - """Processes Kubernetes manifests files and checks if manifest under - process is valid. + self.k8s_artifacts[kind].append(data) + + ''' + This is DEPRECATED and not needed anymore as we check the /resource URL of the kubernetes api against the artifact + def _identify_api(self, artifact, data): """ - for artifact in self.artifacts: - data = None - with open(os.path.join(self.path, artifact), "r") as fp: - logger.debug(os.path.join(self.path, artifact)) - try: - data = anymarkup.parse(fp) - except Exception: - msg = "Error processing %s artifcats, Error:" % os.path.join( - self.path, artifact) - cockpit_logger.error(msg) - raise - if "kind" in data: - self.k8s_manifests.append((data["kind"].lower(), artifact)) - else: - apath = os.path.join(self.path, artifact) - raise ProviderFailedException("Malformed kube file: %s" % apath) - - def _resource_identity(self, path): - """Finds the Kubernetes resource name / identity from resource manifest - and raises if manifest is not supported. - - :arg path: Absolute path to Kubernetes resource manifest - - :return: str -- Resource name / identity - - :raises: ProviderFailedException + Make sure that the artifact is using the correct API + + Args: + artifact (str): Artifact name + data (dict): Artifact data """ - data = anymarkup.parse_file(path) if data["apiVersion"] == "v1": - return data["metadata"]["name"] + pass elif data["apiVersion"] in ["v1beta3", "v1beta2", "v1beta1"]: - msg = ("%s is not supported API version, update Kubernetes " + msg = ("%s is not a supported API version, update Kubernetes " "artifacts to v1 API version. Error in processing " - "%s manifest." % (data["apiVersion"], path)) + "%s manifest." % (data["apiVersion"], artifact)) raise ProviderFailedException(msg) else: - raise ProviderFailedException("Malformed kube file: %s" % path) - - def _scale_replicas(self, path, replicas=0): - """Scales replicationController to specified replicas size - - :arg path: Path to replicationController manifest - :arg replicas: Replica size to scale to. - """ - rname = self._resource_identity(path) - cmd = [self.kubectl, "scale", "rc", rname, - "--replicas=%s" % str(replicas), - "--namespace=%s" % self.namespace] - if self.config_file: - cmd.append("--kubeconfig=%s" % self.config_file) - - self._call(cmd) + raise ProviderFailedException("Malformed kubernetes artifact: %s" % artifact) + ''' def run(self): - """Deploys the app by given resource manifests. + """ + Deploys the app by given resource artifacts. """ logger.info("Deploying to Kubernetes") - self.process_k8s_artifacts() - - for kind, artifact in self.k8s_manifests: - if not artifact: - continue - k8s_file = os.path.join(self.path, artifact) - - cmd = [self.kubectl, "create", "-f", k8s_file, "--namespace=%s" % self.namespace] - if self.config_file: - cmd.append("--kubeconfig=%s" % self.config_file) - self._call(cmd) + for kind, objects in self.k8s_artifacts.iteritems(): + for artifact in objects: + if self.dryrun: + logger.info("DRY-RUN: Deploying k8s KIND: %s, ARTIFACT: %s" + % (kind, artifact)) + else: + self.api.create(artifact, self.namespace) def stop(self): """Undeploys the app by given resource manifests. @@ -194,71 +273,19 @@ def stop(self): the resource from cluster. """ logger.info("Undeploying from Kubernetes") - self.process_k8s_artifacts() - - for kind, artifact in self.k8s_manifests: - if not artifact: - continue - path = os.path.join(self.path, artifact) - - if kind in ["ReplicationController", "rc", "replicationcontroller"]: - self._scale_replicas(path, replicas=0) - - cmd = [self.kubectl, "delete", "-f", path, "--namespace=%s" % self.namespace] - if self.config_file: - cmd.append("--kubeconfig=%s" % self.config_file) - self._call(cmd) + for kind, objects in self.k8s_artifacts.iteritems(): + for artifact in objects: + if self.dryrun: + logger.info("DRY-RUN: Deploying k8s KIND: %s, ARTIFACT: %s" + % (kind, artifact)) + else: + self.api.delete(artifact, self.namespace) + # TODO def persistent_storage(self, graph, action): - """ - Actions are either: run, stop or uninstall as per the Requirements class - Curently run is the only function implemented for k8s persistent storage - """ - - logger.debug("Persistent storage enabled! Running action: %s" % action) - - if graph["accessMode"] not in PERSISTENT_STORAGE_FORMAT: - raise ProviderFailedException("{} is an invalid storage format " - "(choose from {})" - .format(graph["accessMode"], - ', '.join(PERSISTENT_STORAGE_FORMAT))) - - if action not in ['run']: - logger.warning( - "%s action is not available for provider %s. Doing nothing." % - (action, self.key)) - return + pass - self._check_persistent_volumes() - - # Get the path of the persistent storage yaml file includes in /external - # Plug the information from the graph into the persistent storage file - base_path = os.path.dirname(os.path.realpath(__file__)) - template_path = os.path.join(base_path, - 'external/kubernetes/persistent_storage.yaml') - with open(template_path, 'r') as f: - content = f.read() - template = Template(content) - rendered_template = template.safe_substitute(graph) - - tmp_file = Utils.getTmpFile(rendered_template, '.yaml') - - # Pass the .yaml file and execute - if action is "run": - cmd = [self.kubectl, "create", "-f", tmp_file, "--namespace=%s" % self.namespace] - if self.config_file: - cmd.append("--kubeconfig=%s" % self.config_file) - self._call(cmd) - os.unlink(tmp_file) - - def _check_persistent_volumes(self): - cmd = [self.kubectl, "get", "pv"] - if self.config_file: - cmd.append("--kubeconfig=%s" % self.config_file) - lines = self._call(cmd) - - # If there are no persistent volumes to claim, warn the user - if not self.dryrun and len(lines.split("\n")) == 2: - logger.warning("No persistent volumes detected in Kubernetes. Volume claim will not " - "initialize unless persistent volumes exist.") + # TODO + def _check_persistent_volumes(self, graph, action): + pass diff --git a/atomicapp/providers/lib/kubeshift/__init__.py b/atomicapp/providers/lib/kubeshift/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/atomicapp/providers/lib/kubeshift/client.py b/atomicapp/providers/lib/kubeshift/client.py new file mode 100644 index 00000000..ff59b8b2 --- /dev/null +++ b/atomicapp/providers/lib/kubeshift/client.py @@ -0,0 +1,57 @@ +""" + Copyright 2014-2016 Red Hat, Inc. + + This file is part of Atomic App. + + Atomic App is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + Atomic App is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with Atomic App. If not, see . +""" + +from atomicapp.providers.lib.kubeshift.kubernetes import KubeKubernetesClient +from atomicapp.providers.lib.kubeshift.exceptions import KubeClientError +from atomicapp.constants import LOGGER_DEFAULT +import logging +logger = logging.getLogger(LOGGER_DEFAULT) + + +class Client(object): + + def __init__(self, config, provider): + ''' + + Args: + config (obj): Object of the configuration data + provider (str): String value of the provider that is being used + + ''' + self.config = config + self.provider = provider + + # Choose the type of provider that is being used. Error out if it is not available + if provider is "kubernetes": + self.connection = KubeKubernetesClient(config) + logger.debug("Using Kubernetes Provider KubeClient library") + else: + raise KubeClientError("No provider by that name.") + + # Create an object using its respective API + def create(self, obj, namespace="default"): + self.connection.create(obj, namespace) + + # Delete an object using its respective API + def delete(self, obj, namespace="default"): + self.connection.delete(obj, namespace) + + # Current support: kubernetes only + def namespaces(self): + return self.connection.namespaces() diff --git a/atomicapp/providers/lib/kubeshift/exceptions.py b/atomicapp/providers/lib/kubeshift/exceptions.py new file mode 100644 index 00000000..eb00886b --- /dev/null +++ b/atomicapp/providers/lib/kubeshift/exceptions.py @@ -0,0 +1,42 @@ +""" + Copyright 2014-2016 Red Hat, Inc. + + This file is part of Atomic App. + + Atomic App is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + Atomic App is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with Atomic App. If not, see . +""" + + +class KubeOpenshiftError(Exception): + pass + + +class KubeKubernetesError(Exception): + pass + + +class KubeConfigError(Exception): + pass + + +class KubeClientError(Exception): + pass + + +class KubeConnectionError(Exception): + pass + + +class KubeBaseError(Exception): + pass diff --git a/atomicapp/providers/lib/kubeshift/kubebase.py b/atomicapp/providers/lib/kubeshift/kubebase.py new file mode 100644 index 00000000..9e3aee35 --- /dev/null +++ b/atomicapp/providers/lib/kubeshift/kubebase.py @@ -0,0 +1,341 @@ +""" + Copyright 2014-2016 Red Hat, Inc. + + This file is part of Atomic App. + + Atomic App is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + Atomic App is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with Atomic App. If not, see . +""" + +import requests +import websocket +import tempfile +import base64 +import ssl +from requests.exceptions import SSLError +from atomicapp.providers.lib.kubeshift.exceptions import (KubeBaseError, + KubeConnectionError) +from atomicapp.constants import LOGGER_DEFAULT +import logging +logger = logging.getLogger(LOGGER_DEFAULT) + + +class KubeBase(object): + + ''' + The role of Kube Base is to parse the Kube Config file and create an + understandable API as well as initiation of connection to + Kubernetes-based APIs (OpenShift/Kubernetes). + + ''' + cluster = None + user = None + token = None + client_certification = None + client_key = None + certificate_authority_data = None # Not yet implemented + certificate_authority = None + certificate_ca = None # Not yet implemented + insecure_skip_tls_verify = False + + def __init__(self, config): + ''' + Args: + config (object): An object of the .kube/config configuration + ''' + self.kubeconfig = config + + # Gather the "current-context" from .kube/config which lists what the + # associated cluster, user, token, etc. is being used. + if "current-context" not in config: + raise KubeBaseError("'current-context' needs to be set within .kube/config") + else: + self.current_context = config["current-context"] + + # Gather the context and cluster details of .kube/config based upon the current_context + kubeconfig_context = self._contexts()[self.current_context] + kubeconfig_cluster = kubeconfig_context['cluster'] + self.cluster = self._clusters()[kubeconfig_cluster] + + # Gather cluster information (certificate authority) + if "certificate-authority" in self.cluster: + self.certificate_authority = self.cluster["certificate-authority"] + + if "insecure-skip-tls-verify" in self.cluster: + self.insecure_skip_tls_verify = self.cluster["insecure-skip-tls-verify"] + + # If a 'user' is present, gather the information in order to retrieve the token(s), + # certificate(s) as well as client-key. A user is OPTIONAL in the .kube/config data + # and hence the if statement. + if "user" in kubeconfig_context: + + kubeconfig_user = kubeconfig_context['user'] + self.user = self._users()[kubeconfig_user] + + if "token" in self.user: + self.token = self.user['token'] + + if "client-certificate" in self.user: + self.client_certification = self.user['client-certificate'] + + if "client-key" in self.user: + self.client_key = self.user['client-key'] + + # Initialize the connection using all the .kube/config credentials + self.api = self._connection() + + def request(self, method, url, data=None): + ''' + Completes the request to the API and fails if the status_code is != 200/201 + + Args: + method (str): put/get/post/patch + url (str): url of the api call + data (object): object of the data that is being passed (will be converted to json) + ''' + status_code = None + return_data = None + + try: + res = self._request_method(method, url, data) + status_code = res.status_code + return_data = res.json() + except requests.exceptions.ConnectTimeout: + msg = "Timeout when connecting to %s" % url + raise KubeConnectionError(msg) + except requests.exceptions.ReadTimeout: + msg = "Timeout when reading from %s" % url + raise KubeConnectionError(msg) + except requests.exceptions.ConnectionError: + msg = "Refused connection to %s" % url + raise KubeConnectionError(msg) + except SSLError: + raise KubeConnectionError("SSL/TLS ERROR: invalid certificate") + except ValueError: + return_data = None + + # 200 = OK + # 201 = PENDING + # EVERYTHING ELSE == FAIL + if status_code is not 200 and status_code is not 201: + raise KubeConnectionError("Unable to complete request: Status: %s, Error: %s" + % (status_code, return_data)) + return return_data + + def websocket_request(self, url, outfile=None): + ''' + Due to the requests library not supporting SPDY, websocket(s) are required + to communicate to the API. + + Args: + url (str): URL of the API + outfile (str): path of the outfile/data. + ''' + url = 'wss://' + url.split('://', 1)[-1] + logger.debug('Converted http to wss url: {}'.format(url)) + results = [] + + ws = websocket.WebSocketApp( + url, + on_message=lambda ws, message: self._handle_exec_reply(ws, message, results, outfile)) + + ws.run_forever(sslopt={ + 'ca_certs': self.cert_ca if self.cert_ca is not None else ssl.CERT_NONE, + 'cert_reqs': ssl.CERT_REQUIRED if self.insecure_skip_tls_verify else ssl.CERT_NONE}) + + # If an outfile was not provided, return the results in its entirety + if not outfile: + return ''.join(results) + + def get_resources(self, url): + ''' + Get the resources available to the API. This is a list of all available + API calls that can be made to the API. + ''' + data = self.request("get", url) + resources = data["resources"] + resources = [res['name'] for res in resources] + return resources + + def test_connection(self, url): + self.api.request("get", url) + logger.debug("Connection successfully tested") + + @staticmethod + def cert_file(data, key): + ''' + Some certificate .kube/config components are required to be a filename. + + Returns either the filename or a tmp location of said data in a file. + All certificates used with Kubernetes are base64 encoded and thus need to be decoded + + Keys which have "-data" associated with the name are base64 encoded. All others are not. + ''' + + # If it starts with /, we assume it's a filename so we just return that. + if data.startswith('/'): + return data + + # If it's data, we assume it's a certificate and we decode and write to a tmp file + # If the base64 param has been passed as true, we decode the data. + else: + with tempfile.NamedTemporaryFile(delete=False) as f: + # If '-data' is included in the keyname, it's a base64 encoded string and + # is required to be decoded + if "-data" in key: + f.write(base64.b64decode(data)) + else: + f.write(data) + return f.name + + @staticmethod + def kind_to_resource_name(kind): + """ + Converts kind to resource name. It is same logics + as in k8s.io/k8s/pkg/api/meta/restmapper.go (func KindToResource) + Example: + Pod -> pods + Policy - > policies + BuildConfig - > buildconfigs + + Args: + kind (str): Kind of the object + + Returns: + Resource name (str) (kind in plural form) + """ + singular = kind.lower() + if singular.endswith("status"): + plural = singular + "es" + else: + if singular[-1] == "s": + plural = singular + elif singular[-1] == "y": + plural = singular.rstrip("y") + "ies" + else: + plural = singular + "s" + return plural + + def _contexts(self): + ''' + Parses the contexts and formats it in a name = object way. + ex. + 'foobar': { name: 'foobar', context: 'foo' } + ''' + contexts = {} + if "contexts" not in self.kubeconfig: + raise KubeBaseError("No contexts within the .kube/config file") + for f in self.kubeconfig["contexts"]: + contexts[f["name"]] = f["context"] + return contexts + + def _clusters(self): + ''' + Parses the clusters and formats it in a name = object way. + ex. + 'foobar': { name: 'foobar', cluster: 'foo' } + ''' + clusters = {} + if "clusters" not in self.kubeconfig: + raise KubeBaseError("No clusters within the .kube/config file") + for f in self.kubeconfig["clusters"]: + clusters[f["name"]] = f["cluster"] + return clusters + + def _users(self): + ''' + Parses the users and formats it in a name = object way. + ex. + 'foobar': { name: 'foobar', user: 'foo' } + ''' + users = {} + if "users" not in self.kubeconfig: + raise KubeBaseError("No users within the .kube/config file") + for f in self.kubeconfig["users"]: + users[f["name"]] = f["user"] + return users + + def _connection(self): + ''' + Initializes the required requests session certs / token / authentication + in order to communicate with the API + ''' + connection = requests.Session() + + # CA Certificate for TLS verification + if self.certificate_authority: + connection.verify = self.cert_file( + self.certificate_authority, + "certificate-authority") + + # Check to see if verification has been disabled, if it has + # disable tls-verification + if self.insecure_skip_tls_verify: + connection.verify = False + # Disable the 'InsecureRequestWarning' notifications. + # As per: https://github.com/kennethreitz/requests/issues/2214 + # Instead make a large one-time noticable warning instead + requests.packages.urllib3.disable_warnings() + logger.warning("CAUTION: TLS verification has been DISABLED") + else: + logger.debug("Verification will be required for all API calls") + + # If we're using a token, use it, otherwise it's assumed the user uses + # client-certificate and client-key + if self.token: + connection.headers["Authorization"] = "Bearer %s" % self.token + + # Lastly, if we have client-certificate and client-key in the .kube/config + # we add them to the connection as a cert + if self.client_certification and self.client_key: + connection.cert = ( + self.cert_file(self.client_certification, "client-certificate"), + self.cert_file(self.client_key, "client-key") + ) + + return connection + + def _handle_ws_reply(self, ws, message, results, outfile=None): + """ + Handle websocket reply messages for each exec call + """ + # FIXME: For some reason, we do not know why, we need to ignore the + # 1st char of the message, to generate a meaningful result + cleaned_msg = message[1:] + if outfile: + with open(outfile, 'ab') as f: + f.write(cleaned_msg) + else: + results.append(cleaned_msg) + + def _request_method(self, method, url, data): + ''' + Converts the method to the most appropriate request and calls it. + + Args: + method (str): put/get/post/patch + url (str): url of the api call + data (object): object of the data that is being passed (will be converted to json) + ''' + if method.lower() == "get": + res = self.api.get(url) + elif method.lower() == "post": + res = self.api.post(url, json=data) + elif method.lower() == "put": + res = self.api.put(url, json=data) + elif method.lower() == "delete": + res = self.api.delete(url, json=data) + elif method.lower() == "patch": + headers = {"Content-Type": "application/json-patch+json"} + res = self.api.patch(url, json=data, headers=headers) + return res diff --git a/atomicapp/providers/lib/kubeconfig.py b/atomicapp/providers/lib/kubeshift/kubeconfig.py similarity index 66% rename from atomicapp/providers/lib/kubeconfig.py rename to atomicapp/providers/lib/kubeshift/kubeconfig.py index a61c7f08..778d00d8 100644 --- a/atomicapp/providers/lib/kubeconfig.py +++ b/atomicapp/providers/lib/kubeshift/kubeconfig.py @@ -13,6 +13,70 @@ class KubeConfig(object): + @staticmethod + def from_file(filename): + ''' + Load a file using anymarkup + + Params: + filename (str): File location + ''' + + return anymarkup.parse_file(filename) + + @staticmethod + def from_params(api=None, auth=None, ca=None, verify=True): + ''' + Creates a .kube/config configuration as an + object based upon the arguments given. + + Params: + api(str): API URL of the server + auth(str): Authentication key for the server + ca(str): The certificate being used. This can be either a file location or a base64 encoded string + verify(bool): true/false of whether or not certificate verification is enabled + + Returns: + config(obj): An object file of generate .kube/config + + ''' + config = { + "clusters": [ + { + "name": "self", + "cluster": { + }, + }, + ], + "users": [ + { + "name": "self", + "user": { + "token": "" + }, + }, + ], + "contexts": [ + { + "name": "self", + "context": { + "cluster": "self", + "user": "self", + }, + } + ], + "current-context": "self", + } + if api: + config['clusters'][0]['cluster']['server'] = api + + if auth: + config['users'][0]['user']['token'] = auth + + if ca: + config['clusters'][0]['cluster']['certificate-authority'] = ca + return config + @staticmethod def parse_kubeconf(filename): """" diff --git a/atomicapp/providers/lib/kubeshift/kubernetes.py b/atomicapp/providers/lib/kubeshift/kubernetes.py new file mode 100644 index 00000000..7711edc1 --- /dev/null +++ b/atomicapp/providers/lib/kubeshift/kubernetes.py @@ -0,0 +1,171 @@ +""" + Copyright 2014-2016 Red Hat, Inc. + + This file is part of Atomic App. + + Atomic App is free software: you can redistribute it and/or modify + it under the terms of the GNU Lesser General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + Atomic App is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with Atomic App. If not, see . +""" + +from urlparse import urljoin +from urllib import urlencode +from atomicapp.providers.lib.kubeshift.kubebase import KubeBase +from atomicapp.providers.lib.kubeshift.exceptions import (KubeKubernetesError) +from atomicapp.constants import LOGGER_DEFAULT +import logging +import re +logger = logging.getLogger(LOGGER_DEFAULT) + + +class KubeKubernetesClient(object): + + def __init__(self, config): + ''' + + Args: + config (obj): Object of the configuration data + + ''' + + # Pass in the configuration data (.kube/config object) to the KubeBase + self.api = KubeBase(config) + + # Check the API url + url = self.api.cluster['server'] + if not re.match('(?:http|https)://', url): + raise KubeKubernetesError("Kubernetes API URL does not include HTTP or HTTPS") + + # Gather what end-points we will be using + self.k8s_api = urljoin(url, "api/v1/") + + # Test the connection before proceeding + self.api.test_connection(self.k8s_api) + + # Gather the resource names which will be used for the 'kind' API calls + self.k8s_api_resources = self.api.get_resources(self.k8s_api) + + def create(self, obj, namespace): + ''' + Create an object from the Kubernetes cluster + ''' + name = self._get_metadata_name(obj) + kind, url = self._generate_kurl(obj, namespace) + self.api.request("post", url, data=obj) + logger.info("%s '%s' successfully created" % (kind.capitalize(), name)) + + def delete(self, obj, namespace): + ''' + Delete an object from the Kubernetes cluster + + Args: + obj (object): Object of the artifact being modified + namesapce (str): Namespace of the kubernetes cluster to be used + replicates (int): Default 0, size of the amount of replicas to scale + + *Note* + Replication controllers must scale to 0 in order to delete pods. + Kubernetes 1.3 will implement server-side cascading deletion, but + until then, it's mandatory to scale to 0 + https://github.com/kubernetes/kubernetes/blob/master/docs/proposals/garbage-collection.md + + ''' + name = self._get_metadata_name(obj) + kind, url = self._generate_kurl(obj, namespace, name) + + if kind in ['rcs', 'replicationcontrollers']: + self.scale(obj, namespace) + + self.api.request("delete", url) + logger.info("%s '%s' successfully deleted" % (kind.capitalize(), name)) + + def scale(self, obj, namespace, replicas=0): + ''' + By default we scale back down to 0. This function takes an object and scales said + object down to a specified value on the Kubernetes cluster + + Args: + obj (object): Object of the artifact being modified + namesapce (str): Namespace of the kubernetes cluster to be used + replicates (int): Default 0, size of the amount of replicas to scale + ''' + patch = [{"op": "replace", + "path": "/spec/replicas", + "value": replicas}] + name = self._get_metadata_name(obj) + _, url = self._generate_kurl(obj, namespace, name) + self.api.request("patch", url, data=patch) + logger.info("'%s' successfully scaled to %s" % (name, replicas)) + + def namespaces(self): + ''' + Gathers a list of namespaces on the Kubernetes cluster + ''' + url = urljoin(self.k8s_api, "namespaces") + ns = self.api.request("get", url) + return ns['items'] + + def _generate_kurl(self, obj, namespace, name=None, params=None): + ''' + Generate the required URL by extracting the 'kind' from the + object as well as the namespace. + + Args: + obj (obj): Object of the data being passed + namespace (str): k8s namespace + name (str): Name of the object being passed + params (arr): Extra params passed such as timeout=300 + + Returns: + kind (str): The kind used + url (str): The URL to be used / artifact URL + ''' + if 'kind' not in obj.keys(): + raise KubeKubernetesError("Error processing object. There is no kind") + + kind = obj['kind'] + + resource = KubeBase.kind_to_resource_name(kind) + + if resource in self.k8s_api_resources: + url = self.k8s_api + else: + raise KubeKubernetesError("No kind by that name: %s" % kind) + + url = urljoin(url, "namespaces/%s/%s/" % (namespace, resource)) + + if name: + url = urljoin(url, name) + + if params: + url = urljoin(url, "?%s" % urlencode(params)) + + return (resource, url) + + def _get_metadata_name(self, obj): + ''' + This looks at the object and grabs the metadata name of said object + + Args: + obj (object): Object file of the artifact + + Returns: + name (str): Returns the metadata name of the object + ''' + if "metadata" in obj and \ + "name" in obj["metadata"]: + name = obj["metadata"]["name"] + else: + raise KubeKubernetesError("Cannot undeploy. There is no" + " name in object metadata " + "object=%s" % obj) + return name diff --git a/atomicapp/providers/openshift.py b/atomicapp/providers/openshift.py index 5683ca25..aac11d38 100644 --- a/atomicapp/providers/openshift.py +++ b/atomicapp/providers/openshift.py @@ -39,7 +39,7 @@ PROVIDER_TLS_VERIFY_KEY, PROVIDER_CA_KEY, OPENSHIFT_POD_CA_FILE) -from atomicapp.providers.lib.kubeconfig import KubeConfig +from atomicapp.providers.lib.kubeshift.kubeconfig import KubeConfig from requests.exceptions import SSLError import logging logger = logging.getLogger(LOGGER_DEFAULT) diff --git a/tests/units/nulecule/test_kubeconfig.py b/tests/units/nulecule/test_kubeconfig.py index 974b694e..a0f13601 100644 --- a/tests/units/nulecule/test_kubeconfig.py +++ b/tests/units/nulecule/test_kubeconfig.py @@ -1,6 +1,6 @@ import unittest from atomicapp.plugin import ProviderFailedException -from atomicapp.providers.lib.kubeconfig import KubeConfig +from atomicapp.providers.lib.kubeshift.kubeconfig import KubeConfig class TestKubeConfParsing(unittest.TestCase): diff --git a/tests/units/providers/test_kubernetes_provider.py b/tests/units/providers/test_kubernetes_provider.py index 6afbc323..651ceceb 100644 --- a/tests/units/providers/test_kubernetes_provider.py +++ b/tests/units/providers/test_kubernetes_provider.py @@ -28,9 +28,6 @@ MOCK_CONTENT = "mock_provider_call_content" -def mock_provider_call(self, cmd): - return MOCK_CONTENT - class TestKubernetesProviderBase(unittest.TestCase): # Create a temporary directory for our setup as well as load the required providers @@ -53,7 +50,6 @@ def prepare_provider(self, data): return provider # Check that the provider configuration file exists - @mock.patch.object(KubernetesProvider, '_call', mock_provider_call) def test_provider_config_exist(self): provider_config_path = self.create_temp_file() mock_content = "%s_%s" % (MOCK_CONTENT, "_unchanged")