From f651219dc13a9427369b62faf93d71b64e88614b Mon Sep 17 00:00:00 2001 From: Shawn Hurley Date: Mon, 17 Sep 2018 14:50:18 -0400 Subject: [PATCH 1/4] pkg/ansible: Adding paramconv and kubeconfig to ansible operator. (#471) --- pkg/ansible/paramconv/paramconv.go | 164 +++++++++++++ pkg/ansible/proxy/kubeconfig/kubeconfig.go | 101 ++++++++ pkg/ansible/proxy/kubectl.go | 265 +++++++++++++++++++++ pkg/ansible/proxy/proxy.go | 135 +++++++++++ 4 files changed, 665 insertions(+) create mode 100644 pkg/ansible/paramconv/paramconv.go create mode 100644 pkg/ansible/proxy/kubeconfig/kubeconfig.go create mode 100644 pkg/ansible/proxy/kubectl.go create mode 100644 pkg/ansible/proxy/proxy.go diff --git a/pkg/ansible/paramconv/paramconv.go b/pkg/ansible/paramconv/paramconv.go new file mode 100644 index 0000000000..61e45839e8 --- /dev/null +++ b/pkg/ansible/paramconv/paramconv.go @@ -0,0 +1,164 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Based on https://github.com/iancoleman/strcase + +package paramconv + +import ( + "regexp" + "strings" +) + +var ( + numberSequence = regexp.MustCompile(`([a-zA-Z])(\d+)([a-zA-Z]?)`) + numberReplacement = []byte(`$1 $2 $3`) + wordMapping = map[string]string{ + "http": "HTTP", + "url": "URL", + "ip": "IP", + } +) + +func addWordBoundariesToNumbers(s string) string { + b := []byte(s) + b = numberSequence.ReplaceAll(b, numberReplacement) + return string(b) +} + +func translateWord(word string, initCase bool) string { + if val, ok := wordMapping[word]; ok { + return val + } + if initCase { + return strings.Title(word) + } + return word +} + +// Converts a string to CamelCase +func ToCamel(s string) string { + s = addWordBoundariesToNumbers(s) + s = strings.Trim(s, " ") + n := "" + bits := []string{} + for _, v := range s { + if v == '_' || v == ' ' || v == '-' { + bits = append(bits, n) + n = "" + } else { + n += string(v) + } + } + bits = append(bits, n) + + ret := "" + for i, substr := range bits { + ret += translateWord(substr, i != 0) + } + return ret +} + +// Converts a string to snake_case +func ToSnake(s string) string { + s = addWordBoundariesToNumbers(s) + s = strings.Trim(s, " ") + var prefix string + char1 := []rune(s)[0] + if char1 >= 'A' && char1 <= 'Z' { + prefix = "_" + } else { + prefix = "" + } + bits := []string{} + n := "" + real_i := -1 + + for i, v := range s { + real_i += 1 + // treat acronyms as words, eg for JSONData -> JSON is a whole word + nextCaseIsChanged := false + if i+1 < len(s) { + next := s[i+1] + if (v >= 'A' && v <= 'Z' && next >= 'a' && next <= 'z') || (v >= 'a' && v <= 'z' && next >= 'A' && next <= 'Z') { + nextCaseIsChanged = true + } + } + + if real_i > 0 && n[len(n)-1] != '_' && nextCaseIsChanged { + // add underscore if next letter case type is changed + if v >= 'A' && v <= 'Z' { + bits = append(bits, strings.ToLower(n)) + n = string(v) + real_i = 0 + } else if v >= 'a' && v <= 'z' { + bits = append(bits, strings.ToLower(n+string(v))) + n = "" + real_i = -1 + } + } else if v == ' ' || v == '_' || v == '-' { + // replace spaces/underscores with delimiters + bits = append(bits, strings.ToLower(n)) + n = "" + real_i = -1 + } else { + n = n + string(v) + } + } + bits = append(bits, strings.ToLower(n)) + joined := strings.Join(bits, "_") + if _, ok := wordMapping[bits[0]]; !ok { + return prefix + joined + } + return joined +} + +func convertParameter(fn func(string) string, v interface{}) interface{} { + switch v := v.(type) { + case map[string]interface{}: + ret := map[string]interface{}{} + for key, val := range v { + ret[fn(key)] = convertParameter(fn, val) + } + return ret + case []interface{}: + return convertArray(fn, v) + default: + return v + } +} + +func convertArray(fn func(string) string, in []interface{}) []interface{} { + res := make([]interface{}, len(in)) + for i, v := range in { + res[i] = convertParameter(fn, v) + } + return res +} + +func convertMapKeys(fn func(string) string, in map[string]interface{}) map[string]interface{} { + converted := map[string]interface{}{} + for key, val := range in { + converted[fn(key)] = convertParameter(fn, val) + } + return converted +} + +func MapToSnake(in map[string]interface{}) map[string]interface{} { + return convertMapKeys(ToSnake, in) +} + +func MapToCamel(in map[string]interface{}) map[string]interface{} { + return convertMapKeys(ToCamel, in) +} diff --git a/pkg/ansible/proxy/kubeconfig/kubeconfig.go b/pkg/ansible/proxy/kubeconfig/kubeconfig.go new file mode 100644 index 0000000000..56913ac0f8 --- /dev/null +++ b/pkg/ansible/proxy/kubeconfig/kubeconfig.go @@ -0,0 +1,101 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kubeconfig + +import ( + "bytes" + "encoding/base64" + "encoding/json" + "html/template" + "io/ioutil" + "net/url" + "os" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// kubectl, as of 1.10.5, only does basic auth if the username is present in +// the URL. The python client used by ansible, as of 6.0.0, only does basic +// auth if the username and password are provided under the "user" key within +// "users". +const kubeConfigTemplate = `--- +apiVersion: v1 +kind: Config +clusters: +- cluster: + insecure-skip-tls-verify: true + server: {{.ProxyURL}} + name: proxy-server +contexts: +- context: + cluster: proxy-server + user: admin/proxy-server + name: {{.Namespace}}/proxy-server +current-context: {{.Namespace}}/proxy-server +preferences: {} +users: +- name: admin/proxy-server + user: + username: {{.Username}} + password: unused +` + +// values holds the data used to render the template +type values struct { + Username string + ProxyURL string + Namespace string +} + +// Create renders a kubeconfig template and writes it to disk +func Create(ownerRef metav1.OwnerReference, proxyURL string, namespace string) (*os.File, error) { + parsedURL, err := url.Parse(proxyURL) + if err != nil { + return nil, err + } + ownerRefJSON, err := json.Marshal(ownerRef) + if err != nil { + return nil, err + } + username := base64.URLEncoding.EncodeToString([]byte(ownerRefJSON)) + parsedURL.User = url.User(username) + v := values{ + Username: username, + ProxyURL: parsedURL.String(), + Namespace: namespace, + } + + var parsed bytes.Buffer + + t := template.Must(template.New("kubeconfig").Parse(kubeConfigTemplate)) + t.Execute(&parsed, v) + + file, err := ioutil.TempFile("", "kubeconfig") + if err != nil { + return nil, err + } + // multiple calls to close file will not hurt anything, + // but we don't want to lose the error because we are + // writing to the file, so we will call close twice. + defer file.Close() + + if _, err := file.WriteString(parsed.String()); err != nil { + return nil, err + } + if err := file.Close(); err != nil { + return nil, err + } + return file, nil +} diff --git a/pkg/ansible/proxy/kubectl.go b/pkg/ansible/proxy/kubectl.go new file mode 100644 index 0000000000..3c24243b7e --- /dev/null +++ b/pkg/ansible/proxy/kubectl.go @@ -0,0 +1,265 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// This code was retrieved from +// https://github.com/kubernetes/kubernetes/blob/204d994/pkg/kubectl/proxy/proxy_server.go +// and modified for use in this project. + +package proxy + +import ( + "fmt" + "log" + "net" + "net/http" + "net/url" + "os" + "regexp" + "strings" + "syscall" + "time" + + utilnet "k8s.io/apimachinery/pkg/util/net" + k8sproxy "k8s.io/apimachinery/pkg/util/proxy" + "k8s.io/client-go/rest" + "k8s.io/client-go/transport" +) + +const ( + // DefaultHostAcceptRE is the default value for which hosts to accept. + DefaultHostAcceptRE = "^localhost$,^127\\.0\\.0\\.1$,^\\[::1\\]$" + // DefaultPathAcceptRE is the default path to accept. + DefaultPathAcceptRE = "^.*" + // DefaultPathRejectRE is the default set of paths to reject. + DefaultPathRejectRE = "^/api/.*/pods/.*/exec,^/api/.*/pods/.*/attach" + // DefaultMethodRejectRE is the set of HTTP methods to reject by default. + DefaultMethodRejectRE = "^$" +) + +var ( + // ReverseProxyFlushInterval is the frequency to flush the reverse proxy. + // Only matters for long poll connections like the one used to watch. With an + // interval of 0 the reverse proxy will buffer content sent on any connection + // with transfer-encoding=chunked. + // TODO: Flush after each chunk so the client doesn't suffer a 100ms latency per + // watch event. + ReverseProxyFlushInterval = 100 * time.Millisecond +) + +// FilterServer rejects requests which don't match one of the specified regular expressions +type FilterServer struct { + // Only paths that match this regexp will be accepted + AcceptPaths []*regexp.Regexp + // Paths that match this regexp will be rejected, even if they match the above + RejectPaths []*regexp.Regexp + // Hosts are required to match this list of regexp + AcceptHosts []*regexp.Regexp + // Methods that match this regexp are rejected + RejectMethods []*regexp.Regexp + // The delegate to call to handle accepted requests. + delegate http.Handler +} + +// MakeRegexpArray splits a comma separated list of regexps into an array of Regexp objects. +func MakeRegexpArray(str string) ([]*regexp.Regexp, error) { + parts := strings.Split(str, ",") + result := make([]*regexp.Regexp, len(parts)) + for ix := range parts { + re, err := regexp.Compile(parts[ix]) + if err != nil { + return nil, err + } + result[ix] = re + } + return result, nil +} + +// MakeRegexpArrayOrDie creates an array of regular expression objects from a string or exits. +func MakeRegexpArrayOrDie(str string) []*regexp.Regexp { + result, err := MakeRegexpArray(str) + if err != nil { + log.Fatalf("error compiling re: %v", err) + } + return result +} + +func matchesRegexp(str string, regexps []*regexp.Regexp) bool { + for _, re := range regexps { + if re.MatchString(str) { + log.Printf("%v matched %s", str, re) + return true + } + } + return false +} + +func (f *FilterServer) accept(method, path, host string) bool { + if matchesRegexp(path, f.RejectPaths) { + return false + } + if matchesRegexp(method, f.RejectMethods) { + return false + } + if matchesRegexp(path, f.AcceptPaths) && matchesRegexp(host, f.AcceptHosts) { + return true + } + return false +} + +// HandlerFor makes a shallow copy of f which passes its requests along to the +// new delegate. +func (f *FilterServer) HandlerFor(delegate http.Handler) *FilterServer { + f2 := *f + f2.delegate = delegate + return &f2 +} + +// Get host from a host header value like "localhost" or "localhost:8080" +func extractHost(header string) (host string) { + host, _, err := net.SplitHostPort(header) + if err != nil { + host = header + } + return host +} + +func (f *FilterServer) ServeHTTP(rw http.ResponseWriter, req *http.Request) { + host := extractHost(req.Host) + if f.accept(req.Method, req.URL.Path, host) { + log.Printf("Filter accepting %v %v %v", req.Method, req.URL.Path, host) + f.delegate.ServeHTTP(rw, req) + return + } + log.Printf("Filter rejecting %v %v %v", req.Method, req.URL.Path, host) + rw.WriteHeader(http.StatusForbidden) + rw.Write([]byte("

Unauthorized

")) +} + +// Server is a http.Handler which proxies Kubernetes APIs to remote API server. +type server struct { + Handler http.Handler +} + +type responder struct{} + +func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) { + log.Printf("Error while proxying request: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) +} + +// makeUpgradeTransport creates a transport that explicitly bypasses HTTP2 support +// for proxy connections that must upgrade. +func makeUpgradeTransport(config *rest.Config) (k8sproxy.UpgradeRequestRoundTripper, error) { + transportConfig, err := config.TransportConfig() + if err != nil { + return nil, err + } + tlsConfig, err := transport.TLSConfigFor(transportConfig) + if err != nil { + return nil, err + } + rt := utilnet.SetOldTransportDefaults(&http.Transport{ + TLSClientConfig: tlsConfig, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + }).DialContext, + }) + + upgrader, err := transport.HTTPWrappersForConfig(transportConfig, k8sproxy.MirrorRequest) + if err != nil { + return nil, err + } + return k8sproxy.NewUpgradeRequestRoundTripper(rt, upgrader), nil +} + +// NewServer creates and installs a new Server. +func newServer(apiProxyPrefix string, cfg *rest.Config) (*server, error) { + host := cfg.Host + if !strings.HasSuffix(host, "/") { + host = host + "/" + } + target, err := url.Parse(host) + if err != nil { + return nil, err + } + + responder := &responder{} + transport, err := rest.TransportFor(cfg) + if err != nil { + return nil, err + } + upgradeTransport, err := makeUpgradeTransport(cfg) + if err != nil { + return nil, err + } + proxy := k8sproxy.NewUpgradeAwareHandler(target, transport, false, false, responder) + proxy.UpgradeTransport = upgradeTransport + proxy.UseRequestLocation = true + + proxyServer := http.Handler(proxy) + + if !strings.HasPrefix(apiProxyPrefix, "/api") { + proxyServer = stripLeaveSlash(apiProxyPrefix, proxyServer) + } + + mux := http.NewServeMux() + mux.Handle(apiProxyPrefix, proxyServer) + return &server{Handler: mux}, nil +} + +// Listen is a simple wrapper around net.Listen. +func (s *server) Listen(address string, port int) (net.Listener, error) { + return net.Listen("tcp", fmt.Sprintf("%s:%d", address, port)) +} + +// ListenUnix does net.Listen for a unix socket +func (s *server) ListenUnix(path string) (net.Listener, error) { + // Remove any socket, stale or not, but fall through for other files + fi, err := os.Stat(path) + if err == nil && (fi.Mode()&os.ModeSocket) != 0 { + os.Remove(path) + } + // Default to only user accessible socket, caller can open up later if desired + oldmask := syscall.Umask(0077) + l, err := net.Listen("unix", path) + syscall.Umask(oldmask) + return l, err +} + +// ServeOnListener starts the server using given listener, loops forever. +func (s *server) ServeOnListener(l net.Listener) error { + server := http.Server{ + Handler: s.Handler, + } + return server.Serve(l) +} + +// like http.StripPrefix, but always leaves an initial slash. (so that our +// regexps will work.) +func stripLeaveSlash(prefix string, h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + p := strings.TrimPrefix(req.URL.Path, prefix) + if len(p) >= len(req.URL.Path) { + http.NotFound(w, req) + return + } + if len(p) > 0 && p[:1] != "/" { + p = "/" + p + } + req.URL.Path = p + h.ServeHTTP(w, req) + }) +} diff --git a/pkg/ansible/proxy/proxy.go b/pkg/ansible/proxy/proxy.go new file mode 100644 index 0000000000..58596f22fb --- /dev/null +++ b/pkg/ansible/proxy/proxy.go @@ -0,0 +1,135 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package proxy + +// This file contains this project's custom code, as opposed to kubectl.go +// which contains code retrieved from the kubernetes project. + +import ( + "bytes" + "encoding/base64" + "encoding/json" + "io/ioutil" + "net/http" + "net/http/httputil" + + "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/rest" +) + +// InjectOwnerReferenceHandler will handle proxied requests and inject the +// owner refernece found in the authorization header. The Authorization is +// then deleted so that the proxy can re-set with the correct authorization. +func InjectOwnerReferenceHandler(h http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if req.Method == http.MethodPost { + logrus.Info("injecting owner reference") + dump, _ := httputil.DumpRequest(req, false) + logrus.Debugf(string(dump)) + + user, _, ok := req.BasicAuth() + if !ok { + logrus.Error("basic auth header not found") + w.Header().Set("WWW-Authenticate", "Basic realm=\"Operator Proxy\"") + http.Error(w, "", http.StatusUnauthorized) + return + } + authString, err := base64.StdEncoding.DecodeString(user) + if err != nil { + m := "could not base64 decode username" + logrus.Errorf("%s: %s", err.Error()) + http.Error(w, m, http.StatusBadRequest) + return + } + owner := metav1.OwnerReference{} + json.Unmarshal(authString, &owner) + + logrus.Debugf("%#+v", owner) + + body, err := ioutil.ReadAll(req.Body) + if err != nil { + m := "could not read request body" + logrus.Errorf("%s: %s", err.Error()) + http.Error(w, m, http.StatusInternalServerError) + return + } + data := &unstructured.Unstructured{} + err = json.Unmarshal(body, data) + if err != nil { + m := "could not deserialize request body" + logrus.Errorf("%s: %s", err.Error()) + http.Error(w, m, http.StatusBadRequest) + return + } + data.SetOwnerReferences(append(data.GetOwnerReferences(), owner)) + newBody, err := json.Marshal(data.Object) + if err != nil { + m := "could not serialize body" + logrus.Errorf("%s: %s", err.Error()) + http.Error(w, m, http.StatusInternalServerError) + return + } + logrus.Debugf(string(newBody)) + req.Body = ioutil.NopCloser(bytes.NewBuffer(newBody)) + req.ContentLength = int64(len(newBody)) + } + // Removing the authorization so that the proxy can set the correct authorization. + req.Header.Del("Authorization") + h.ServeHTTP(w, req) + }) +} + +// HandlerChain will be used for users to pass defined handlers to the proxy. +// The hander chain will be run after InjectingOwnerReference if it is added +// and before the proxy handler. +type HandlerChain func(http.Handler) http.Handler + +// Options will be used by the user to specify the desired details +// for the proxy. +type Options struct { + Address string + Port int + Handler HandlerChain + NoOwnerInjection bool + KubeConfig *rest.Config +} + +// RunProxy will start a proxy server in a go routine and return on the error +// channel if something is not correct on startup. +func RunProxy(done chan error, o Options) { + server, err := newServer("/", o.KubeConfig) + if err != nil { + done <- err + return + } + if o.Handler != nil { + server.Handler = o.Handler(server.Handler) + } + + if !o.NoOwnerInjection { + server.Handler = InjectOwnerReferenceHandler(server.Handler) + } + l, err := server.Listen(o.Address, o.Port) + if err != nil { + done <- err + return + } + go func() { + logrus.Infof("Starting to serve on %s\n", l.Addr().String()) + done <- server.ServeOnListener(l) + }() +} From 61cd24e6f30a15399f202b86309a2f187584ec4c Mon Sep 17 00:00:00 2001 From: Shawn Hurley Date: Wed, 19 Sep 2018 10:00:46 -0400 Subject: [PATCH 2/4] Adding runner package. (#472) --- pkg/ansible/runner/eventapi/eventapi.go | 184 +++++++++++ pkg/ansible/runner/eventapi/types.go | 76 +++++ .../runner/internal/inputdir/inputdir.go | 123 +++++++ pkg/ansible/runner/runner.go | 303 ++++++++++++++++++ 4 files changed, 686 insertions(+) create mode 100644 pkg/ansible/runner/eventapi/eventapi.go create mode 100644 pkg/ansible/runner/eventapi/types.go create mode 100644 pkg/ansible/runner/internal/inputdir/inputdir.go create mode 100644 pkg/ansible/runner/runner.go diff --git a/pkg/ansible/runner/eventapi/eventapi.go b/pkg/ansible/runner/eventapi/eventapi.go new file mode 100644 index 0000000000..a28e22f3ff --- /dev/null +++ b/pkg/ansible/runner/eventapi/eventapi.go @@ -0,0 +1,184 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventapi + +import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "strings" + "sync" + "time" + + "github.com/sirupsen/logrus" +) + +// EventReceiver serves the event API +type EventReceiver struct { + // Events is the channel used by the event API handler to send JobEvents + // back to the runner, or whatever code is using this receiver. + Events chan JobEvent + + // SocketPath is the path on the filesystem to a unix streaming socket + SocketPath string + + // URLPath is the path portion of the url at which events should be + // received. For example, "/events/" + URLPath string + + // server is the http.Server instance that serves the event API. It must be + // closed. + server io.Closer + + // stopped indicates if this receiver has permanently stopped receiving + // events. When true, requests to POST an event will receive a "410 Gone" + // response, and the body will be ignored. + stopped bool + + // mutex controls access to the "stopped" bool above, ensuring that writes + // are goroutine-safe. + mutex sync.RWMutex + + // ident is the unique identifier for a particular run of ansible-runner + ident string + + // logger holds a logger that has some fields already set + logger logrus.FieldLogger +} + +func New(ident string, errChan chan<- error) (*EventReceiver, error) { + sockPath := fmt.Sprintf("/tmp/ansibleoperator-%s", ident) + listener, err := net.Listen("unix", sockPath) + if err != nil { + return nil, err + } + + rec := EventReceiver{ + Events: make(chan JobEvent, 1000), + SocketPath: sockPath, + URLPath: "/events/", + ident: ident, + logger: logrus.WithFields(logrus.Fields{ + "component": "eventapi", + "job": ident, + }), + } + + mux := http.NewServeMux() + mux.HandleFunc(rec.URLPath, rec.handleEvents) + srv := http.Server{Handler: mux} + rec.server = &srv + + go func() { + errChan <- srv.Serve(listener) + }() + return &rec, nil +} + +// Close ensures that appropriate resources are cleaned up, such as any unix +// streaming socket that may be in use. Close must be called. +func (e *EventReceiver) Close() { + e.mutex.Lock() + e.stopped = true + e.mutex.Unlock() + e.logger.Debug("event API stopped") + e.server.Close() + close(e.Events) +} + +func (e *EventReceiver) handleEvents(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != e.URLPath { + http.NotFound(w, r) + e.logger.WithFields(logrus.Fields{ + "code": "404", + }).Infof("path not found: %s\n", r.URL.Path) + return + } + + if r.Method != http.MethodPost { + e.logger.WithFields(logrus.Fields{ + "code": "405", + }).Infof("method %s not allowed", r.Method) + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + ct := r.Header.Get("content-type") + if strings.Split(ct, ";")[0] != "application/json" { + e.logger.WithFields(logrus.Fields{ + "code": "415", + }).Info("wrong content type: %s", ct) + w.WriteHeader(http.StatusUnsupportedMediaType) + w.Write([]byte("The content-type must be \"application/json\"")) + return + } + + body, err := ioutil.ReadAll(r.Body) + if err != nil { + e.logger.WithFields(logrus.Fields{ + "code": "500", + }).Errorf("%s", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + return + } + + event := JobEvent{} + err = json.Unmarshal(body, &event) + if err != nil { + e.logger.WithFields(logrus.Fields{ + "code": "400", + }).Infof("could not deserialize body: %s", err.Error()) + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("Could not deserialize body as JSON")) + return + } + + // Guarantee that the Events channel will not be written to if stopped == + // true, because in that case the channel has been closed. + e.mutex.RLock() + defer e.mutex.RUnlock() + if e.stopped { + e.mutex.RUnlock() + w.WriteHeader(http.StatusGone) + e.logger.WithFields(logrus.Fields{ + "code": "410", + }).Info("stopped and not accepting additional events for this job") + return + } + // ansible-runner sends "status events" and "ansible events". The "status + // events" signify a change in the state of ansible-runner itself, which + // we're not currently interested in. + // https://ansible-runner.readthedocs.io/en/latest/external_interface.html#event-structure + if event.UUID == "" { + e.logger.Info("dropping event that is not a JobEvent") + } else { + // timeout if the channel blocks for too long + timeout := time.NewTimer(10 * time.Second) + select { + case e.Events <- event: + case <-timeout.C: + e.logger.WithFields(logrus.Fields{ + "code": "500", + }).Warn("timed out writing event to channel") + w.WriteHeader(http.StatusInternalServerError) + return + } + _ = timeout.Stop() + } + w.WriteHeader(http.StatusNoContent) +} diff --git a/pkg/ansible/runner/eventapi/types.go b/pkg/ansible/runner/eventapi/types.go new file mode 100644 index 0000000000..ffbf2ad743 --- /dev/null +++ b/pkg/ansible/runner/eventapi/types.go @@ -0,0 +1,76 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventapi + +import ( + "fmt" + "strings" + "time" +) + +// EventTime - time to unmarshal nano time. +type EventTime struct { + time.Time +} + +// UnmarshalJSON - override unmarshal json. +func (e *EventTime) UnmarshalJSON(b []byte) (err error) { + e.Time, err = time.Parse("2006-01-02T15:04:05.999999999", strings.Trim(string(b[:]), "\"\\")) + if err != nil { + return err + } + return nil +} + +// MarshalJSON - override the marshal json. +func (e EventTime) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("\"%s\"", e.Time.Format("2006-01-02T15:04:05.99999999"))), nil +} + +// JobEvent - event of an ansible run. +type JobEvent struct { + UUID string `json:"uuid"` + Counter int `json:"counter"` + StdOut string `json:"stdout"` + StartLine int `json:"start_line"` + EndLine int `json:"EndLine"` + Event string `json:"event"` + EventData map[string]interface{} `json:"event_data"` + PID int `json:"pid"` + Created EventTime `json:"created"` +} + +// StatusJobEvent - event of an ansible run. +type StatusJobEvent struct { + UUID string `json:"uuid"` + Counter int `json:"counter"` + StdOut string `json:"stdout"` + StartLine int `json:"start_line"` + EndLine int `json:"EndLine"` + Event string `json:"event"` + EventData StatsEventData `json:"event_data"` + PID int `json:"pid"` + Created EventTime `json:"created"` +} + +// StatsEventData - data for a the status event. +type StatsEventData struct { + Playbook string `json:"playbook"` + PlaybookUUID string `json:"playbook_uuid"` + Changed map[string]int `json:"changed"` + Ok map[string]int `json:"ok"` + Failures map[string]int `json:"failures"` + Skipped map[string]int `json:"skipped"` +} diff --git a/pkg/ansible/runner/internal/inputdir/inputdir.go b/pkg/ansible/runner/internal/inputdir/inputdir.go new file mode 100644 index 0000000000..dae6bed5ff --- /dev/null +++ b/pkg/ansible/runner/internal/inputdir/inputdir.go @@ -0,0 +1,123 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package inputdir + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path/filepath" + + "github.com/sirupsen/logrus" +) + +// InputDir represents an input directory for ansible-runner. +type InputDir struct { + Path string + PlaybookPath string + Parameters map[string]interface{} + EnvVars map[string]string + Settings map[string]string +} + +// makeDirs creates the required directory structure. +func (i *InputDir) makeDirs() error { + for _, path := range []string{"env", "project", "inventory"} { + fullPath := filepath.Join(i.Path, path) + err := os.MkdirAll(fullPath, os.ModePerm) + if err != nil { + logrus.Errorf("unable to create directory %v", fullPath) + return err + } + } + return nil +} + +// addFile adds a file to the given relative path within the input directory. +func (i *InputDir) addFile(path string, content []byte) error { + fullPath := filepath.Join(i.Path, path) + err := ioutil.WriteFile(fullPath, content, 0644) + if err != nil { + logrus.Errorf("unable to write file %v", fullPath) + } + return err +} + +// Write commits the object's state to the filesystem at i.Path. +func (i *InputDir) Write() error { + paramBytes, err := json.Marshal(i.Parameters) + if err != nil { + return err + } + envVarBytes, err := json.Marshal(i.EnvVars) + if err != nil { + return err + } + settingsBytes, err := json.Marshal(i.Settings) + if err != nil { + return err + } + + err = i.makeDirs() + if err != nil { + return err + } + + err = i.addFile("env/envvars", envVarBytes) + if err != nil { + return err + } + err = i.addFile("env/extravars", paramBytes) + if err != nil { + return err + } + err = i.addFile("env/settings", settingsBytes) + if err != nil { + return err + } + + // If ansible-runner is running in a python virtual environment, propagate + // that to ansible. + venv := os.Getenv("VIRTUAL_ENV") + hosts := "localhost ansible_connection=local" + if venv != "" { + hosts = fmt.Sprintf("%s ansible_python_interpreter=%s", hosts, filepath.Join(venv, "bin/python")) + } + err = i.addFile("inventory/hosts", []byte(hosts)) + if err != nil { + return err + } + + if i.PlaybookPath != "" { + f, err := os.Open(i.PlaybookPath) + if err != nil { + logrus.Errorf("failed to open playbook file %v", i.PlaybookPath) + return err + } + defer f.Close() + + playbookBytes, err := ioutil.ReadAll(f) + if err != nil { + return err + } + + err = i.addFile("project/playbook.yaml", playbookBytes) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/ansible/runner/runner.go b/pkg/ansible/runner/runner.go new file mode 100644 index 0000000000..737050b547 --- /dev/null +++ b/pkg/ansible/runner/runner.go @@ -0,0 +1,303 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runner + +import ( + "errors" + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + + "github.com/operator-framework/operator-sdk/pkg/ansible/paramconv" + "github.com/operator-framework/operator-sdk/pkg/ansible/runner/eventapi" + "github.com/operator-framework/operator-sdk/pkg/ansible/runner/internal/inputdir" + "github.com/sirupsen/logrus" + yaml "gopkg.in/yaml.v2" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// Runner - a runnable that should take the parameters and name and namespace +// and run the correct code. +type Runner interface { + Run(*unstructured.Unstructured, string) (chan eventapi.JobEvent, error) + GetFinalizer() (string, bool) +} + +// watch holds data used to create a mapping of GVK to ansible playbook or role. +// The mapping is used to compose an ansible operator. +type watch struct { + Version string `yaml:"version"` + Group string `yaml:"group"` + Kind string `yaml:"kind"` + Playbook string `yaml:"playbook"` + Role string `yaml:"role"` + Finalizer *Finalizer `yaml:"finalizer"` +} + +// Finalizer - Expose finalizer to be used by a user. +type Finalizer struct { + Name string `yaml:"name"` + Playbook string `yaml:"playbook"` + Role string `yaml:"role"` + Vars map[string]interface{} `yaml:"vars"` +} + +// NewFromWatches reads the operator's config file at the provided path. +func NewFromWatches(path string) (map[schema.GroupVersionKind]Runner, error) { + b, err := ioutil.ReadFile(path) + if err != nil { + logrus.Errorf("failed to get config file %v", err) + return nil, err + } + watches := []watch{} + err = yaml.Unmarshal(b, &watches) + if err != nil { + logrus.Errorf("failed to unmarshal config %v", err) + return nil, err + } + + m := map[schema.GroupVersionKind]Runner{} + for _, w := range watches { + s := schema.GroupVersionKind{ + Group: w.Group, + Version: w.Version, + Kind: w.Kind, + } + // Check if schema is a duplicate + if _, ok := m[s]; ok { + return nil, fmt.Errorf("duplicate GVK: %v", s.String()) + } + switch { + case w.Playbook != "": + r, err := NewForPlaybook(w.Playbook, s, w.Finalizer) + if err != nil { + return nil, err + } + m[s] = r + case w.Role != "": + r, err := NewForRole(w.Role, s, w.Finalizer) + if err != nil { + return nil, err + } + m[s] = r + default: + return nil, fmt.Errorf("either playbook or role must be defined for %v", s) + } + } + return m, nil +} + +// NewForPlaybook returns a new Runner based on the path to an ansible playbook. +func NewForPlaybook(path string, gvk schema.GroupVersionKind, finalizer *Finalizer) (Runner, error) { + if !filepath.IsAbs(path) { + return nil, fmt.Errorf("playbook path must be absolute for %v", gvk) + } + r := &runner{ + Path: path, + GVK: gvk, + cmdFunc: func(ident, inputDirPath string) *exec.Cmd { + return exec.Command("ansible-runner", "-vv", "-p", path, "-i", ident, "run", inputDirPath) + }, + } + err := r.addFinalizer(finalizer) + if err != nil { + return nil, err + } + return r, nil +} + +// NewForRole returns a new Runner based on the path to an ansible role. +func NewForRole(path string, gvk schema.GroupVersionKind, finalizer *Finalizer) (Runner, error) { + if !filepath.IsAbs(path) { + return nil, fmt.Errorf("role path must be absolute for %v", gvk) + } + path = strings.TrimRight(path, "/") + r := &runner{ + Path: path, + GVK: gvk, + cmdFunc: func(ident, inputDirPath string) *exec.Cmd { + rolePath, roleName := filepath.Split(path) + return exec.Command("ansible-runner", "-vv", "--role", roleName, "--roles-path", rolePath, "--hosts", "localhost", "-i", ident, "run", inputDirPath) + }, + } + err := r.addFinalizer(finalizer) + if err != nil { + return nil, err + } + return r, nil +} + +// runner - implements the Runner interface for a GVK that's being watched. +type runner struct { + Path string // path on disk to a playbook or role depending on what cmdFunc expects + GVK schema.GroupVersionKind // GVK being watched that corresponds to the Path + Finalizer *Finalizer + cmdFunc func(ident, inputDirPath string) *exec.Cmd // returns a Cmd that runs ansible-runner + finalizerCmdFunc func(ident, inputDirPath string) *exec.Cmd +} + +func (r *runner) Run(u *unstructured.Unstructured, kubeconfig string) (chan eventapi.JobEvent, error) { + if u.GetDeletionTimestamp() != nil && !r.isFinalizerRun(u) { + return nil, errors.New("resource has been deleted, but no finalizer was matched, skipping reconciliation") + } + ident := strconv.Itoa(rand.Int()) + logger := logrus.WithFields(logrus.Fields{ + "component": "runner", + "job": ident, + "name": u.GetName(), + "namespace": u.GetNamespace(), + }) + // start the event receiver. We'll check errChan for an error after + // ansible-runner exits. + errChan := make(chan error, 1) + receiver, err := eventapi.New(ident, errChan) + if err != nil { + return nil, err + } + inputDir := inputdir.InputDir{ + Path: filepath.Join("/tmp/ansible-operator/runner/", r.GVK.Group, r.GVK.Version, r.GVK.Kind, u.GetNamespace(), u.GetName()), + Parameters: r.makeParameters(u), + EnvVars: map[string]string{ + "K8S_AUTH_KUBECONFIG": kubeconfig, + }, + Settings: map[string]string{ + "runner_http_url": receiver.SocketPath, + "runner_http_path": receiver.URLPath, + }, + } + // If Path is a dir, assume it is a role path. Otherwise assume it's a + // playbook path + fi, err := os.Lstat(r.Path) + if err != nil { + return nil, err + } + if !fi.IsDir() { + inputDir.PlaybookPath = r.Path + } + err = inputDir.Write() + if err != nil { + return nil, err + } + + go func() { + var dc *exec.Cmd + if r.isFinalizerRun(u) { + logger.Debugf("Resource is marked for deletion, running finalizer %s", r.Finalizer.Name) + dc = r.finalizerCmdFunc(ident, inputDir.Path) + } else { + dc = r.cmdFunc(ident, inputDir.Path) + } + + err := dc.Run() + if err != nil { + logger.Errorf("error from ansible-runner: %s", err.Error()) + } else { + logger.Info("ansible-runner exited successfully") + } + + receiver.Close() + err = <-errChan + // http.Server returns this in the case of being closed cleanly + if err != nil && err != http.ErrServerClosed { + logger.Errorf("error from event api: %s", err.Error()) + } + }() + return receiver.Events, nil +} + +func (r *runner) GetFinalizer() (string, bool) { + if r.Finalizer != nil { + return r.Finalizer.Name, true + } + return "", false +} + +func (r *runner) isFinalizerRun(u *unstructured.Unstructured) bool { + finalizersSet := r.Finalizer != nil && u.GetFinalizers() != nil + // The resource is deleted and our finalizer is present, we need to run the finalizer + if finalizersSet && u.GetDeletionTimestamp() != nil { + for _, f := range u.GetFinalizers() { + if f == r.Finalizer.Name { + return true + } + } + } + return false +} + +func (r *runner) addFinalizer(finalizer *Finalizer) error { + r.Finalizer = finalizer + switch { + case finalizer == nil: + return nil + case finalizer.Playbook != "": + if !filepath.IsAbs(finalizer.Playbook) { + return fmt.Errorf("finalizer playbook path must be absolute for %v", r.GVK) + } + r.finalizerCmdFunc = func(ident, inputDirPath string) *exec.Cmd { + return exec.Command("ansible-runner", "-vv", "-p", finalizer.Playbook, "-i", ident, "run", inputDirPath) + } + case finalizer.Role != "": + if !filepath.IsAbs(finalizer.Role) { + return fmt.Errorf("finalizer role path must be absolute for %v", r.GVK) + } + r.finalizerCmdFunc = func(ident, inputDirPath string) *exec.Cmd { + path := strings.TrimRight(finalizer.Role, "/") + rolePath, roleName := filepath.Split(path) + return exec.Command("ansible-runner", "-vv", "--role", roleName, "--roles-path", rolePath, "--hosts", "localhost", "-i", ident, "run", inputDirPath) + } + case len(finalizer.Vars) != 0: + r.finalizerCmdFunc = r.cmdFunc + } + return nil +} + +// makeParameters - creates the extravars parameters for ansible +// The resulting structure in json is: +// { "meta": { +// "name": , +// "namespace": , +// }, +// , +// ... +// __: { +// Date: Thu, 27 Sep 2018 08:05:16 -0400 Subject: [PATCH 3/4] pkg/ansible: Adding ansible operator controller and events package (#473) --- Gopkg.lock | 161 +++++++++++++++++++++++-- pkg/ansible/controller/controller.go | 91 ++++++++++++++ pkg/ansible/controller/reconcile.go | 173 +++++++++++++++++++++++++++ pkg/ansible/controller/source.go | 78 ++++++++++++ pkg/ansible/controller/types.go | 125 +++++++++++++++++++ pkg/ansible/events/log_events.go | 73 +++++++++++ 6 files changed, 689 insertions(+), 12 deletions(-) create mode 100644 pkg/ansible/controller/controller.go create mode 100644 pkg/ansible/controller/reconcile.go create mode 100644 pkg/ansible/controller/source.go create mode 100644 pkg/ansible/controller/types.go create mode 100644 pkg/ansible/events/log_events.go diff --git a/Gopkg.lock b/Gopkg.lock index 4b837a0e47..11e1ac485b 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -25,6 +25,22 @@ revision = "0ca9ea5df5451ffdf184b4428c902747c2c11cd7" version = "v1.0.0" +[[projects]] + branch = "master" + digest = "1:65587005c6fa4293c0b8a2e457e689df7fda48cc5e1f5449ea2c1e7784551558" + name = "github.com/go-logr/logr" + packages = ["."] + pruneopts = "" + revision = "9fb12b3b21c5415d16ac18dc5cd42c1cfdd40c4e" + +[[projects]] + branch = "master" + digest = "1:ce43ad4015e7cdad3f0e8f2c8339439dd4470859a828d2a6988b0f713699e94a" + name = "github.com/go-logr/zapr" + packages = ["."] + pruneopts = "" + revision = "7536572e8d55209135cd5e7ccf7fce43dca217ab" + [[projects]] digest = "1:6d49a51383769ec8b1835bd0bea6c446057b109d2676bb28a20264577a1bb74b" name = "github.com/gobuffalo/envy" @@ -52,6 +68,14 @@ pruneopts = "" revision = "23def4e6c14b4da8ac2ed8007337bc5eb5007998" +[[projects]] + branch = "master" + digest = "1:9854532d7b2fee9414d4fcd8d8bccd6b1c1e1663d8ec0337af63a19aaf4a778e" + name = "github.com/golang/groupcache" + packages = ["lru"] + pruneopts = "" + revision = "6f2cf27854a4a29e3811b0371547be335d411b8b" + [[projects]] digest = "1:3dd078fda7500c341bc26cfbc6c6a34614f295a2457149fc1045cab767cbcf18" name = "github.com/golang/protobuf" @@ -82,6 +106,14 @@ pruneopts = "" revision = "24818f796faf91cd76ec7bddd72458fbced7a6c1" +[[projects]] + digest = "1:5247b135b5492aa232a731acdcb52b08f32b874cb398f21ab460396eadbe866b" + name = "github.com/google/uuid" + packages = ["."] + pruneopts = "" + revision = "d460ce9f8df2e77fb1ba55ca87fafed96c607494" + version = "v1.0.0" + [[projects]] digest = "1:16b2837c8b3cf045fa2cdc82af0cf78b19582701394484ae76b2c3bc3c99ad73" name = "github.com/googleapis/gnostic" @@ -157,12 +189,20 @@ revision = "b729f2633dfe35f4d1d8a32385f6685610ce1cb5" [[projects]] - digest = "1:ad2a7161b6c8e1f1514c8eb1c75eb17a7b9943e11187229e55d796242be10c6b" + digest = "1:3d8a9dd6693e55d63b28634d66d11d647cd0b65362dedc18d686db01aa5f8678" name = "github.com/markbates/inflect" packages = ["."] pruneopts = "" - revision = "dd7de90c06bca70f18136e59dec2270c19a401e7" - version = "v1.0.0" + revision = "28bf78dadb0f64748ff13a0b6547e4972a5cea64" + version = "v1.0.1" + +[[projects]] + branch = "master" + digest = "1:58050e2bc9621cc6b68c1da3e4a0d1c40ad1f89062b9855c26521fd42a97a106" + name = "github.com/mattbaird/jsonpatch" + packages = ["."] + pruneopts = "" + revision = "81af80346b1a01caae0cbc27fd3c1ba5b11e189f" [[projects]] digest = "1:63722a4b1e1717be7b98fc686e0b30d5e7f734b9e93d7dee86293b6deab7ea28" @@ -188,6 +228,22 @@ revision = "4b7aa43c6742a2c18fdef89dd197aaae7dac7ccd" version = "1.0.1" +[[projects]] + branch = "master" + digest = "1:d33ce379780d7c43405b9251289493cabada82f6bf9ab35eea6915d04f6ac8e0" + name = "github.com/mxk/go-flowrate" + packages = ["flowrate"] + pruneopts = "" + revision = "cca7078d478f8520f85629ad7c68962d31ed7682" + +[[projects]] + digest = "1:a5484d4fa43127138ae6e7b2299a6a52ae006c7f803d98d717f60abf3e97192e" + name = "github.com/pborman/uuid" + packages = ["."] + pruneopts = "" + revision = "adf5a7427709b9deb95d29d3fa8a2bf9cfd388f1" + version = "v1.2" + [[projects]] branch = "master" digest = "1:c24598ffeadd2762552269271b3b1510df2d83ee6696c1e543a0ff653af494bc" @@ -273,45 +329,78 @@ version = "v0.0.3" [[projects]] - digest = "1:0a52bcb568386d98f4894575d53ce3e456f56471de6897bb8b9de13c33d9340e" + digest = "1:cbaf13cdbfef0e4734ed8a7504f57fe893d471d62a35b982bf6fb3f036449a66" name = "github.com/spf13/pflag" packages = ["."] pruneopts = "" - revision = "9a97c102cda95a86cec2345a6f09f55a939babf5" - version = "v1.0.2" + revision = "298182f68c66c05229eb03ac171abe6e309ee79a" + version = "v1.0.3" + +[[projects]] + digest = "1:74f86c458e82e1c4efbab95233e0cf51b7cc02dc03193be9f62cd81224e10401" + name = "go.uber.org/atomic" + packages = ["."] + pruneopts = "" + revision = "1ea20fb1cbb1cc08cbd0d913a96dead89aa18289" + version = "v1.3.2" + +[[projects]] + digest = "1:22c7effcb4da0eacb2bb1940ee173fac010e9ef3c691f5de4b524d538bd980f5" + name = "go.uber.org/multierr" + packages = ["."] + pruneopts = "" + revision = "3c4937480c32f4c13a875a1829af76c98ca3d40a" + version = "v1.1.0" + +[[projects]] + digest = "1:246f378f80fba6fcf0f191c486b6613265abd2bc0f2fa55a36b928c67352021e" + name = "go.uber.org/zap" + packages = [ + ".", + "buffer", + "internal/bufferpool", + "internal/color", + "internal/exit", + "zapcore", + ] + pruneopts = "" + revision = "ff33455a0e382e8a81d14dd7c922020b6b5e7982" + version = "v1.9.1" [[projects]] branch = "master" - digest = "1:d10ba2393047dc9ba61df4d36c849a03017175edfa3ee4da23c3a12e10326443" + digest = "1:6914c49eed986dfb8dffb33516fa129c49929d4d873f41e073c83c11c372b870" name = "golang.org/x/crypto" packages = ["ssh/terminal"] pruneopts = "" - revision = "5295e8364332db77d75fce11f1d19c053919a9c9" + revision = "e3636079e1a4c1f337f212cc5cd2aca108f6c900" [[projects]] branch = "master" - digest = "1:08e41d63f8dac84d83797368b56cf0b339e42d0224e5e56668963c28aec95685" + digest = "1:db4fdf03f745823eff6c2f06cd50eb447a7b77f307079cf5220e1621a1d63593" name = "golang.org/x/net" packages = [ "context", + "html", + "html/atom", "http/httpguts", "http2", "http2/hpack", "idna", ] pruneopts = "" - revision = "4dfa2610cdf3b287375bbba5b8f2a14d3b01d8de" + revision = "f5e5bdd778241bfefa8627f7124c39cd6ad8d74f" [[projects]] branch = "master" - digest = "1:149a432fabebb8221a80f77731b1cd63597197ded4f14af606ebe3a0959004ec" + digest = "1:69e22fc503ccbebfa61f4eb1799d905b7100f86458a62589103541f3dd15dd36" name = "golang.org/x/sys" packages = [ "unix", "windows", ] pruneopts = "" - revision = "e4b3c5e9061176387e7cea65e4dc5853801f3fb7" + revision = "af653ce8b74f808d092db8ca9741fbb63d2a469d" [[projects]] digest = "1:5acd3512b047305d49e8763eef7ba423901e85d5dd2fd1e71778a0ea8de10bd4" @@ -364,6 +453,7 @@ digest = "1:2fe7efa9ea3052443378383d27c15ba088d03babe69a89815ce7fe9ec1d9aeb4" name = "k8s.io/api" packages = [ + "admission/v1beta1", "admissionregistration/v1alpha1", "admissionregistration/v1beta1", "apps/v1", @@ -442,17 +532,24 @@ "pkg/util/diff", "pkg/util/errors", "pkg/util/framer", + "pkg/util/httpstream", "pkg/util/intstr", "pkg/util/json", + "pkg/util/mergepatch", "pkg/util/net", + "pkg/util/proxy", "pkg/util/runtime", "pkg/util/sets", + "pkg/util/strategicpatch", + "pkg/util/uuid", "pkg/util/validation", "pkg/util/validation/field", "pkg/util/wait", "pkg/util/yaml", "pkg/version", "pkg/watch", + "third_party/forked/golang/json", + "third_party/forked/golang/netutil", "third_party/forked/golang/reflect", ] pruneopts = "" @@ -511,8 +608,11 @@ "tools/clientcmd/api", "tools/clientcmd/api/latest", "tools/clientcmd/api/v1", + "tools/leaderelection", + "tools/leaderelection/resourcelock", "tools/metrics", "tools/pager", + "tools/record", "tools/reference", "transport", "util/buffer", @@ -528,12 +628,40 @@ revision = "1f13a808da65775f22cbf47862c4e5898d8f4ca1" version = "kubernetes-1.11.2" +[[projects]] + branch = "master" + digest = "1:7b06ff480fd71dead51f0f243b573c448c372ec086b790ec7ed4f8a78f2c1cbf" + name = "k8s.io/kube-openapi" + packages = ["pkg/util/proto"] + pruneopts = "" + revision = "9dfdf9be683f61f82cda12362c44c784e0778b56" + [[projects]] digest = "1:6cad2468c5831529b860a01f09032f6ff38202bc4f76332ef7ad74a993e4aa5a" name = "sigs.k8s.io/controller-runtime" packages = [ + "pkg/cache", + "pkg/cache/internal", "pkg/client", "pkg/client/apiutil", + "pkg/controller", + "pkg/event", + "pkg/handler", + "pkg/internal/controller", + "pkg/internal/recorder", + "pkg/leaderelection", + "pkg/manager", + "pkg/patch", + "pkg/predicate", + "pkg/reconcile", + "pkg/recorder", + "pkg/runtime/inject", + "pkg/runtime/log", + "pkg/source", + "pkg/source/internal", + "pkg/webhook/admission", + "pkg/webhook/admission/types", + "pkg/webhook/types", ] pruneopts = "" revision = "53fc44b56078cd095b11bd44cfa0288ee4cf718f" @@ -564,6 +692,8 @@ "k8s.io/apimachinery/pkg/runtime/serializer", "k8s.io/apimachinery/pkg/types", "k8s.io/apimachinery/pkg/util/intstr", + "k8s.io/apimachinery/pkg/util/net", + "k8s.io/apimachinery/pkg/util/proxy", "k8s.io/apimachinery/pkg/util/wait", "k8s.io/apimachinery/pkg/watch", "k8s.io/client-go/discovery/cached", @@ -574,8 +704,15 @@ "k8s.io/client-go/restmapper", "k8s.io/client-go/tools/cache", "k8s.io/client-go/tools/clientcmd", + "k8s.io/client-go/transport", "k8s.io/client-go/util/workqueue", "sigs.k8s.io/controller-runtime/pkg/client", + "sigs.k8s.io/controller-runtime/pkg/controller", + "sigs.k8s.io/controller-runtime/pkg/event", + "sigs.k8s.io/controller-runtime/pkg/handler", + "sigs.k8s.io/controller-runtime/pkg/manager", + "sigs.k8s.io/controller-runtime/pkg/reconcile", + "sigs.k8s.io/controller-runtime/pkg/source", ] solver-name = "gps-cdcl" solver-version = 1 diff --git a/pkg/ansible/controller/controller.go b/pkg/ansible/controller/controller.go new file mode 100644 index 0000000000..51b1b14eff --- /dev/null +++ b/pkg/ansible/controller/controller.go @@ -0,0 +1,91 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "fmt" + "log" + "strings" + "time" + + "github.com/operator-framework/operator-sdk/pkg/ansible/events" + "github.com/operator-framework/operator-sdk/pkg/ansible/runner" + + "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/controller" + crthandler "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +// Options - options for your controller +type Options struct { + EventHandlers []events.EventHandler + LoggingLevel events.LogLevel + Runner runner.Runner + Namespace string + GVK schema.GroupVersionKind + // StopChannel is used to deal with the bug: + // https://github.com/kubernetes-sigs/controller-runtime/issues/103 + StopChannel <-chan struct{} +} + +// Add - Creates a new ansible operator controller and adds it to the manager +func Add(mgr manager.Manager, options Options) { + logrus.Infof("Watching %s/%v, %s, %s", options.GVK.Group, options.GVK.Version, options.GVK.Kind, options.Namespace) + if options.EventHandlers == nil { + options.EventHandlers = []events.EventHandler{} + } + eventHandlers := append(options.EventHandlers, events.NewLoggingEventHandler(options.LoggingLevel)) + + aor := &AnsibleOperatorReconciler{ + Client: mgr.GetClient(), + GVK: options.GVK, + Runner: options.Runner, + EventHandlers: eventHandlers, + } + + // Register the GVK with the schema + mgr.GetScheme().AddKnownTypeWithName(options.GVK, &unstructured.Unstructured{}) + metav1.AddToGroupVersion(mgr.GetScheme(), schema.GroupVersion{ + Group: options.GVK.Group, + Version: options.GVK.Version, + }) + + //Create new controller runtime controller and set the controller to watch GVK. + c, err := controller.New(fmt.Sprintf("%v-controller", strings.ToLower(options.GVK.Kind)), mgr, controller.Options{ + Reconciler: aor, + }) + if err != nil { + log.Fatal(err) + } + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(options.GVK) + if err := c.Watch(&source.Kind{Type: u}, &crthandler.EnqueueRequestForObject{}); err != nil { + log.Fatal(err) + } + + r := NewReconcileLoop(time.Minute*1, options.GVK, mgr.GetClient()) + r.Stop = options.StopChannel + cs := &source.Channel{Source: r.Source} + cs.InjectStopChannel(options.StopChannel) + if err := c.Watch(cs, &crthandler.EnqueueRequestForObject{}); err != nil { + log.Fatal(err) + } + r.Start() +} diff --git a/pkg/ansible/controller/reconcile.go b/pkg/ansible/controller/reconcile.go new file mode 100644 index 0000000000..0db01ab103 --- /dev/null +++ b/pkg/ansible/controller/reconcile.go @@ -0,0 +1,173 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "encoding/json" + "errors" + "os" + + "github.com/operator-framework/operator-sdk/pkg/ansible/events" + "github.com/operator-framework/operator-sdk/pkg/ansible/proxy/kubeconfig" + "github.com/operator-framework/operator-sdk/pkg/ansible/runner" + "github.com/operator-framework/operator-sdk/pkg/ansible/runner/eventapi" + + "github.com/sirupsen/logrus" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +// AnsibleOperatorReconciler - object to reconcile runner requests +type AnsibleOperatorReconciler struct { + GVK schema.GroupVersionKind + Runner runner.Runner + Client client.Client + EventHandlers []events.EventHandler +} + +// Reconcile - handle the event. +func (r *AnsibleOperatorReconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { + u := &unstructured.Unstructured{} + u.SetGroupVersionKind(r.GVK) + err := r.Client.Get(context.TODO(), request.NamespacedName, u) + if apierrors.IsNotFound(err) { + return reconcile.Result{}, nil + } + if err != nil { + return reconcile.Result{}, err + } + + deleted := u.GetDeletionTimestamp() != nil + finalizer, finalizerExists := r.Runner.GetFinalizer() + pendingFinalizers := u.GetFinalizers() + // If the resource is being deleted we don't want to add the finalizer again + if finalizerExists && !deleted && !contains(pendingFinalizers, finalizer) { + logrus.Debugf("Adding finalizer %s to resource", finalizer) + finalizers := append(pendingFinalizers, finalizer) + u.SetFinalizers(finalizers) + err := r.Client.Update(context.TODO(), u) + return reconcile.Result{}, err + } + if !contains(pendingFinalizers, finalizer) && deleted { + logrus.Info("Resource is terminated, skipping reconcilation") + return reconcile.Result{}, nil + } + + s := u.Object["spec"] + _, ok := s.(map[string]interface{}) + if !ok { + logrus.Warnf("spec was not found") + u.Object["spec"] = map[string]interface{}{} + r.Client.Update(context.TODO(), u) + return reconcile.Result{Requeue: true}, nil + } + ownerRef := metav1.OwnerReference{ + APIVersion: u.GetAPIVersion(), + Kind: u.GetKind(), + Name: u.GetName(), + UID: u.GetUID(), + } + + kc, err := kubeconfig.Create(ownerRef, "http://localhost:8888", u.GetNamespace()) + if err != nil { + return reconcile.Result{}, err + } + defer os.Remove(kc.Name()) + eventChan, err := r.Runner.Run(u, kc.Name()) + if err != nil { + return reconcile.Result{}, err + } + + // iterate events from ansible, looking for the final one + statusEvent := eventapi.StatusJobEvent{} + for event := range eventChan { + for _, eHandler := range r.EventHandlers { + go eHandler.Handle(u, event) + } + if event.Event == "playbook_on_stats" { + // convert to StatusJobEvent; would love a better way to do this + data, err := json.Marshal(event) + if err != nil { + return reconcile.Result{}, err + } + err = json.Unmarshal(data, &statusEvent) + if err != nil { + return reconcile.Result{}, err + } + } + } + if statusEvent.Event == "" { + err := errors.New("did not receive playbook_on_stats event") + logrus.Error(err.Error()) + return reconcile.Result{}, err + } + + // We only want to update the CustomResource once, so we'll track changes and do it at the end + var needsUpdate bool + runSuccessful := true + for _, count := range statusEvent.EventData.Failures { + if count > 0 { + runSuccessful = false + break + } + } + // The finalizer has run successfully, time to remove it + if deleted && finalizerExists && runSuccessful { + finalizers := []string{} + for _, pendingFinalizer := range pendingFinalizers { + if pendingFinalizer != finalizer { + finalizers = append(finalizers, pendingFinalizer) + } + } + u.SetFinalizers(finalizers) + needsUpdate = true + } + + statusMap, ok := u.Object["status"].(map[string]interface{}) + if !ok { + u.Object["status"] = ResourceStatus{ + Status: NewStatusFromStatusJobEvent(statusEvent), + } + logrus.Infof("adding status for the first time") + needsUpdate = true + } else { + // Need to conver the map[string]interface into a resource status. + if update, status := UpdateResourceStatus(statusMap, statusEvent); update { + u.Object["status"] = status + needsUpdate = true + } + } + if needsUpdate { + err = r.Client.Update(context.TODO(), u) + } + if !runSuccessful { + return reconcile.Result{Requeue: true}, err + } + return reconcile.Result{}, err +} + +func contains(l []string, s string) bool { + for _, elem := range l { + if elem == s { + return true + } + } + return false +} diff --git a/pkg/ansible/controller/source.go b/pkg/ansible/controller/source.go new file mode 100644 index 0000000000..f8556d7d86 --- /dev/null +++ b/pkg/ansible/controller/source.go @@ -0,0 +1,78 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "time" + + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" +) + +// ReconcileLoop - new loop +type ReconcileLoop struct { + Source chan event.GenericEvent + Stop <-chan struct{} + GVK schema.GroupVersionKind + Interval time.Duration + Client client.Client +} + +// NewReconcileLoop - loop for a GVK. +// The reconcilation loop is needed because the resync period +// for the informer is not suitable for this use case. +func NewReconcileLoop(interval time.Duration, gvk schema.GroupVersionKind, c client.Client) ReconcileLoop { + s := make(chan event.GenericEvent, 1025) + return ReconcileLoop{ + Source: s, + GVK: gvk, + Interval: interval, + Client: c, + } +} + +// Start - start the reconcile loop +func (r *ReconcileLoop) Start() { + go func() { + ticker := time.NewTicker(r.Interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + // List all object for the GVK + ul := &unstructured.UnstructuredList{} + ul.SetGroupVersionKind(r.GVK) + err := r.Client.List(context.Background(), nil, ul) + if err != nil { + logrus.Warningf("unable to list resources for GV: %v during reconcilation", r.GVK) + continue + } + for _, u := range ul.Items { + e := event.GenericEvent{ + Meta: &u, + Object: &u, + } + r.Source <- e + } + case <-r.Stop: + return + } + } + }() +} diff --git a/pkg/ansible/controller/types.go b/pkg/ansible/controller/types.go new file mode 100644 index 0000000000..1dd148bbd2 --- /dev/null +++ b/pkg/ansible/controller/types.go @@ -0,0 +1,125 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "github.com/operator-framework/operator-sdk/pkg/ansible/runner/eventapi" +) + +const ( + host = "localhost" +) + +type Status struct { + Ok int `json:"ok"` + Changed int `json:"changed"` + Skipped int `json:"skipped"` + Failures int `json:"failures"` + TimeOfCompletion eventapi.EventTime `json:"completion"` +} + +func NewStatusFromStatusJobEvent(je eventapi.StatusJobEvent) Status { + // ok events. + o := 0 + changed := 0 + skipped := 0 + failures := 0 + if v, ok := je.EventData.Changed[host]; ok { + changed = v + } + if v, ok := je.EventData.Ok[host]; ok { + o = v + } + if v, ok := je.EventData.Skipped[host]; ok { + skipped = v + } + if v, ok := je.EventData.Failures[host]; ok { + failures = v + } + return Status{ + Ok: o, + Changed: changed, + Skipped: skipped, + Failures: failures, + TimeOfCompletion: je.Created, + } +} + +func IsStatusEqual(s1, s2 Status) bool { + return (s1.Ok == s2.Ok && s1.Changed == s2.Changed && s1.Skipped == s2.Skipped && s1.Failures == s2.Failures) +} + +func NewStatusFromMap(sm map[string]interface{}) Status { + //Create Old top level status + // ok events. + o := 0 + changed := 0 + skipped := 0 + failures := 0 + e := eventapi.EventTime{} + if v, ok := sm["changed"]; ok { + changed = int(v.(int64)) + } + if v, ok := sm["ok"]; ok { + o = int(v.(int64)) + } + if v, ok := sm["skipped"]; ok { + skipped = int(v.(int64)) + } + if v, ok := sm["failures"]; ok { + failures = int(v.(int64)) + } + if v, ok := sm["completion"]; ok { + s := v.(string) + e.UnmarshalJSON([]byte(s)) + } + return Status{ + Ok: o, + Changed: changed, + Skipped: skipped, + Failures: failures, + TimeOfCompletion: e, + } +} + +type ResourceStatus struct { + Status `json:",inline"` + FailureMessage string `json:"reason,omitempty"` + History []Status `json:"history,omitempty"` +} + +func UpdateResourceStatus(sm map[string]interface{}, je eventapi.StatusJobEvent) (bool, ResourceStatus) { + newStatus := NewStatusFromStatusJobEvent(je) + oldStatus := NewStatusFromMap(sm) + // Don't update the status if new status and old status are equal. + if IsStatusEqual(newStatus, oldStatus) { + return false, ResourceStatus{} + } + + history := []Status{} + h, ok := sm["history"] + if ok { + hi := h.([]interface{}) + for _, m := range hi { + ma := m.(map[string]interface{}) + history = append(history, NewStatusFromMap(ma)) + } + } + history = append(history, oldStatus) + return true, ResourceStatus{ + Status: newStatus, + History: history, + } +} diff --git a/pkg/ansible/events/log_events.go b/pkg/ansible/events/log_events.go new file mode 100644 index 0000000000..37422c106b --- /dev/null +++ b/pkg/ansible/events/log_events.go @@ -0,0 +1,73 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package events + +import ( + "github.com/operator-framework/operator-sdk/pkg/ansible/runner/eventapi" + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// LogLevel - Levelt for the logging to take place. +type LogLevel int + +const ( + // Tasks - only log the high level tasks. + Tasks LogLevel = iota + + // Everything - log every event. + Everything + + // Nothing - this will log nothing. + Nothing +) + +// EventHandler - knows how to handle job events. +type EventHandler interface { + Handle(*unstructured.Unstructured, eventapi.JobEvent) +} + +type loggingEventHandler struct { + LogLevel LogLevel +} + +func (l loggingEventHandler) Handle(u *unstructured.Unstructured, e eventapi.JobEvent) { + log := logrus.WithFields(logrus.Fields{ + "component": "logging_event_handler", + "name": u.GetName(), + "namespace": u.GetNamespace(), + "gvk": u.GroupVersionKind().String(), + "event_type": e.Event, + }) + t, ok := e.EventData["task"] + if ok { + log = log.WithField("task", t) + } + switch l.LogLevel { + case Everything: + log.Infof("event: %#v", e.EventData) + case Tasks: + if ok { + log.Infof("event: %#v", e.EventData) + } + } +} + +// NewLoggingEventHandler - Creates a Logging Event Handler to log events. +func NewLoggingEventHandler(l LogLevel) EventHandler { + return loggingEventHandler{ + LogLevel: l, + } +} From 9b4912b575dc99b66249d21d96bbe819ad95c980 Mon Sep 17 00:00:00 2001 From: John Kim Date: Thu, 27 Sep 2018 16:20:33 -0400 Subject: [PATCH 4/4] make ansible task log outputs more readable (#545) --- pkg/ansible/events/log_events.go | 38 ++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/pkg/ansible/events/log_events.go b/pkg/ansible/events/log_events.go index 37422c106b..629dbb0b29 100644 --- a/pkg/ansible/events/log_events.go +++ b/pkg/ansible/events/log_events.go @@ -32,6 +32,15 @@ const ( // Nothing - this will log nothing. Nothing + + // Ansible Events + EventPlaybookOnTaskStart = "playbook_on_task_start" + EventRunnerOnOk = "runner_on_ok" + EventRunnerOnFailed = "runner_on_failed" + + // Ansible Task Actions + TaskActionSetFact = "set_fact" + TaskActionDebug = "debug" ) // EventHandler - knows how to handle job events. @@ -51,17 +60,32 @@ func (l loggingEventHandler) Handle(u *unstructured.Unstructured, e eventapi.Job "gvk": u.GroupVersionKind().String(), "event_type": e.Event, }) + if l.LogLevel == Nothing { + return + } + // log only the following for the 'Tasks' LogLevel t, ok := e.EventData["task"] if ok { - log = log.WithField("task", t) + setFactAction := e.EventData["task_action"] == TaskActionSetFact + debugAction := e.EventData["task_action"] == TaskActionDebug + + if e.Event == EventPlaybookOnTaskStart && !setFactAction && !debugAction { + log.Infof("[playbook task]: %s", e.EventData["name"]) + return + } + if e.Event == EventRunnerOnOk && debugAction { + log.Infof("[playbook debug]: %v", e.EventData["task_args"]) + return + } + if e.Event == EventRunnerOnFailed { + log.Errorf("[failed]: [playbook task] '%s' failed with task_args - %v", + t, e.EventData["task_args"]) + return + } } - switch l.LogLevel { - case Everything: + // log everything else for the 'Everything' LogLevel + if l.LogLevel == Everything { log.Infof("event: %#v", e.EventData) - case Tasks: - if ok { - log.Infof("event: %#v", e.EventData) - } } }