diff --git a/pkg/ansible/runner/eventapi/eventapi.go b/pkg/ansible/runner/eventapi/eventapi.go new file mode 100644 index 0000000000..a28e22f3ff --- /dev/null +++ b/pkg/ansible/runner/eventapi/eventapi.go @@ -0,0 +1,184 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventapi + +import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "strings" + "sync" + "time" + + "github.com/sirupsen/logrus" +) + +// EventReceiver serves the event API +type EventReceiver struct { + // Events is the channel used by the event API handler to send JobEvents + // back to the runner, or whatever code is using this receiver. + Events chan JobEvent + + // SocketPath is the path on the filesystem to a unix streaming socket + SocketPath string + + // URLPath is the path portion of the url at which events should be + // received. For example, "/events/" + URLPath string + + // server is the http.Server instance that serves the event API. It must be + // closed. + server io.Closer + + // stopped indicates if this receiver has permanently stopped receiving + // events. When true, requests to POST an event will receive a "410 Gone" + // response, and the body will be ignored. + stopped bool + + // mutex controls access to the "stopped" bool above, ensuring that writes + // are goroutine-safe. + mutex sync.RWMutex + + // ident is the unique identifier for a particular run of ansible-runner + ident string + + // logger holds a logger that has some fields already set + logger logrus.FieldLogger +} + +func New(ident string, errChan chan<- error) (*EventReceiver, error) { + sockPath := fmt.Sprintf("/tmp/ansibleoperator-%s", ident) + listener, err := net.Listen("unix", sockPath) + if err != nil { + return nil, err + } + + rec := EventReceiver{ + Events: make(chan JobEvent, 1000), + SocketPath: sockPath, + URLPath: "/events/", + ident: ident, + logger: logrus.WithFields(logrus.Fields{ + "component": "eventapi", + "job": ident, + }), + } + + mux := http.NewServeMux() + mux.HandleFunc(rec.URLPath, rec.handleEvents) + srv := http.Server{Handler: mux} + rec.server = &srv + + go func() { + errChan <- srv.Serve(listener) + }() + return &rec, nil +} + +// Close ensures that appropriate resources are cleaned up, such as any unix +// streaming socket that may be in use. Close must be called. +func (e *EventReceiver) Close() { + e.mutex.Lock() + e.stopped = true + e.mutex.Unlock() + e.logger.Debug("event API stopped") + e.server.Close() + close(e.Events) +} + +func (e *EventReceiver) handleEvents(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != e.URLPath { + http.NotFound(w, r) + e.logger.WithFields(logrus.Fields{ + "code": "404", + }).Infof("path not found: %s\n", r.URL.Path) + return + } + + if r.Method != http.MethodPost { + e.logger.WithFields(logrus.Fields{ + "code": "405", + }).Infof("method %s not allowed", r.Method) + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + + ct := r.Header.Get("content-type") + if strings.Split(ct, ";")[0] != "application/json" { + e.logger.WithFields(logrus.Fields{ + "code": "415", + }).Info("wrong content type: %s", ct) + w.WriteHeader(http.StatusUnsupportedMediaType) + w.Write([]byte("The content-type must be \"application/json\"")) + return + } + + body, err := ioutil.ReadAll(r.Body) + if err != nil { + e.logger.WithFields(logrus.Fields{ + "code": "500", + }).Errorf("%s", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + return + } + + event := JobEvent{} + err = json.Unmarshal(body, &event) + if err != nil { + e.logger.WithFields(logrus.Fields{ + "code": "400", + }).Infof("could not deserialize body: %s", err.Error()) + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("Could not deserialize body as JSON")) + return + } + + // Guarantee that the Events channel will not be written to if stopped == + // true, because in that case the channel has been closed. + e.mutex.RLock() + defer e.mutex.RUnlock() + if e.stopped { + e.mutex.RUnlock() + w.WriteHeader(http.StatusGone) + e.logger.WithFields(logrus.Fields{ + "code": "410", + }).Info("stopped and not accepting additional events for this job") + return + } + // ansible-runner sends "status events" and "ansible events". The "status + // events" signify a change in the state of ansible-runner itself, which + // we're not currently interested in. + // https://ansible-runner.readthedocs.io/en/latest/external_interface.html#event-structure + if event.UUID == "" { + e.logger.Info("dropping event that is not a JobEvent") + } else { + // timeout if the channel blocks for too long + timeout := time.NewTimer(10 * time.Second) + select { + case e.Events <- event: + case <-timeout.C: + e.logger.WithFields(logrus.Fields{ + "code": "500", + }).Warn("timed out writing event to channel") + w.WriteHeader(http.StatusInternalServerError) + return + } + _ = timeout.Stop() + } + w.WriteHeader(http.StatusNoContent) +} diff --git a/pkg/ansible/runner/eventapi/types.go b/pkg/ansible/runner/eventapi/types.go new file mode 100644 index 0000000000..ffbf2ad743 --- /dev/null +++ b/pkg/ansible/runner/eventapi/types.go @@ -0,0 +1,76 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package eventapi + +import ( + "fmt" + "strings" + "time" +) + +// EventTime - time to unmarshal nano time. +type EventTime struct { + time.Time +} + +// UnmarshalJSON - override unmarshal json. +func (e *EventTime) UnmarshalJSON(b []byte) (err error) { + e.Time, err = time.Parse("2006-01-02T15:04:05.999999999", strings.Trim(string(b[:]), "\"\\")) + if err != nil { + return err + } + return nil +} + +// MarshalJSON - override the marshal json. +func (e EventTime) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("\"%s\"", e.Time.Format("2006-01-02T15:04:05.99999999"))), nil +} + +// JobEvent - event of an ansible run. +type JobEvent struct { + UUID string `json:"uuid"` + Counter int `json:"counter"` + StdOut string `json:"stdout"` + StartLine int `json:"start_line"` + EndLine int `json:"EndLine"` + Event string `json:"event"` + EventData map[string]interface{} `json:"event_data"` + PID int `json:"pid"` + Created EventTime `json:"created"` +} + +// StatusJobEvent - event of an ansible run. +type StatusJobEvent struct { + UUID string `json:"uuid"` + Counter int `json:"counter"` + StdOut string `json:"stdout"` + StartLine int `json:"start_line"` + EndLine int `json:"EndLine"` + Event string `json:"event"` + EventData StatsEventData `json:"event_data"` + PID int `json:"pid"` + Created EventTime `json:"created"` +} + +// StatsEventData - data for a the status event. +type StatsEventData struct { + Playbook string `json:"playbook"` + PlaybookUUID string `json:"playbook_uuid"` + Changed map[string]int `json:"changed"` + Ok map[string]int `json:"ok"` + Failures map[string]int `json:"failures"` + Skipped map[string]int `json:"skipped"` +} diff --git a/pkg/ansible/runner/internal/inputdir/inputdir.go b/pkg/ansible/runner/internal/inputdir/inputdir.go new file mode 100644 index 0000000000..dae6bed5ff --- /dev/null +++ b/pkg/ansible/runner/internal/inputdir/inputdir.go @@ -0,0 +1,123 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package inputdir + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path/filepath" + + "github.com/sirupsen/logrus" +) + +// InputDir represents an input directory for ansible-runner. +type InputDir struct { + Path string + PlaybookPath string + Parameters map[string]interface{} + EnvVars map[string]string + Settings map[string]string +} + +// makeDirs creates the required directory structure. +func (i *InputDir) makeDirs() error { + for _, path := range []string{"env", "project", "inventory"} { + fullPath := filepath.Join(i.Path, path) + err := os.MkdirAll(fullPath, os.ModePerm) + if err != nil { + logrus.Errorf("unable to create directory %v", fullPath) + return err + } + } + return nil +} + +// addFile adds a file to the given relative path within the input directory. +func (i *InputDir) addFile(path string, content []byte) error { + fullPath := filepath.Join(i.Path, path) + err := ioutil.WriteFile(fullPath, content, 0644) + if err != nil { + logrus.Errorf("unable to write file %v", fullPath) + } + return err +} + +// Write commits the object's state to the filesystem at i.Path. +func (i *InputDir) Write() error { + paramBytes, err := json.Marshal(i.Parameters) + if err != nil { + return err + } + envVarBytes, err := json.Marshal(i.EnvVars) + if err != nil { + return err + } + settingsBytes, err := json.Marshal(i.Settings) + if err != nil { + return err + } + + err = i.makeDirs() + if err != nil { + return err + } + + err = i.addFile("env/envvars", envVarBytes) + if err != nil { + return err + } + err = i.addFile("env/extravars", paramBytes) + if err != nil { + return err + } + err = i.addFile("env/settings", settingsBytes) + if err != nil { + return err + } + + // If ansible-runner is running in a python virtual environment, propagate + // that to ansible. + venv := os.Getenv("VIRTUAL_ENV") + hosts := "localhost ansible_connection=local" + if venv != "" { + hosts = fmt.Sprintf("%s ansible_python_interpreter=%s", hosts, filepath.Join(venv, "bin/python")) + } + err = i.addFile("inventory/hosts", []byte(hosts)) + if err != nil { + return err + } + + if i.PlaybookPath != "" { + f, err := os.Open(i.PlaybookPath) + if err != nil { + logrus.Errorf("failed to open playbook file %v", i.PlaybookPath) + return err + } + defer f.Close() + + playbookBytes, err := ioutil.ReadAll(f) + if err != nil { + return err + } + + err = i.addFile("project/playbook.yaml", playbookBytes) + if err != nil { + return err + } + } + return nil +} diff --git a/pkg/ansible/runner/runner.go b/pkg/ansible/runner/runner.go new file mode 100644 index 0000000000..737050b547 --- /dev/null +++ b/pkg/ansible/runner/runner.go @@ -0,0 +1,303 @@ +// Copyright 2018 The Operator-SDK Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runner + +import ( + "errors" + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + + "github.com/operator-framework/operator-sdk/pkg/ansible/paramconv" + "github.com/operator-framework/operator-sdk/pkg/ansible/runner/eventapi" + "github.com/operator-framework/operator-sdk/pkg/ansible/runner/internal/inputdir" + "github.com/sirupsen/logrus" + yaml "gopkg.in/yaml.v2" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// Runner - a runnable that should take the parameters and name and namespace +// and run the correct code. +type Runner interface { + Run(*unstructured.Unstructured, string) (chan eventapi.JobEvent, error) + GetFinalizer() (string, bool) +} + +// watch holds data used to create a mapping of GVK to ansible playbook or role. +// The mapping is used to compose an ansible operator. +type watch struct { + Version string `yaml:"version"` + Group string `yaml:"group"` + Kind string `yaml:"kind"` + Playbook string `yaml:"playbook"` + Role string `yaml:"role"` + Finalizer *Finalizer `yaml:"finalizer"` +} + +// Finalizer - Expose finalizer to be used by a user. +type Finalizer struct { + Name string `yaml:"name"` + Playbook string `yaml:"playbook"` + Role string `yaml:"role"` + Vars map[string]interface{} `yaml:"vars"` +} + +// NewFromWatches reads the operator's config file at the provided path. +func NewFromWatches(path string) (map[schema.GroupVersionKind]Runner, error) { + b, err := ioutil.ReadFile(path) + if err != nil { + logrus.Errorf("failed to get config file %v", err) + return nil, err + } + watches := []watch{} + err = yaml.Unmarshal(b, &watches) + if err != nil { + logrus.Errorf("failed to unmarshal config %v", err) + return nil, err + } + + m := map[schema.GroupVersionKind]Runner{} + for _, w := range watches { + s := schema.GroupVersionKind{ + Group: w.Group, + Version: w.Version, + Kind: w.Kind, + } + // Check if schema is a duplicate + if _, ok := m[s]; ok { + return nil, fmt.Errorf("duplicate GVK: %v", s.String()) + } + switch { + case w.Playbook != "": + r, err := NewForPlaybook(w.Playbook, s, w.Finalizer) + if err != nil { + return nil, err + } + m[s] = r + case w.Role != "": + r, err := NewForRole(w.Role, s, w.Finalizer) + if err != nil { + return nil, err + } + m[s] = r + default: + return nil, fmt.Errorf("either playbook or role must be defined for %v", s) + } + } + return m, nil +} + +// NewForPlaybook returns a new Runner based on the path to an ansible playbook. +func NewForPlaybook(path string, gvk schema.GroupVersionKind, finalizer *Finalizer) (Runner, error) { + if !filepath.IsAbs(path) { + return nil, fmt.Errorf("playbook path must be absolute for %v", gvk) + } + r := &runner{ + Path: path, + GVK: gvk, + cmdFunc: func(ident, inputDirPath string) *exec.Cmd { + return exec.Command("ansible-runner", "-vv", "-p", path, "-i", ident, "run", inputDirPath) + }, + } + err := r.addFinalizer(finalizer) + if err != nil { + return nil, err + } + return r, nil +} + +// NewForRole returns a new Runner based on the path to an ansible role. +func NewForRole(path string, gvk schema.GroupVersionKind, finalizer *Finalizer) (Runner, error) { + if !filepath.IsAbs(path) { + return nil, fmt.Errorf("role path must be absolute for %v", gvk) + } + path = strings.TrimRight(path, "/") + r := &runner{ + Path: path, + GVK: gvk, + cmdFunc: func(ident, inputDirPath string) *exec.Cmd { + rolePath, roleName := filepath.Split(path) + return exec.Command("ansible-runner", "-vv", "--role", roleName, "--roles-path", rolePath, "--hosts", "localhost", "-i", ident, "run", inputDirPath) + }, + } + err := r.addFinalizer(finalizer) + if err != nil { + return nil, err + } + return r, nil +} + +// runner - implements the Runner interface for a GVK that's being watched. +type runner struct { + Path string // path on disk to a playbook or role depending on what cmdFunc expects + GVK schema.GroupVersionKind // GVK being watched that corresponds to the Path + Finalizer *Finalizer + cmdFunc func(ident, inputDirPath string) *exec.Cmd // returns a Cmd that runs ansible-runner + finalizerCmdFunc func(ident, inputDirPath string) *exec.Cmd +} + +func (r *runner) Run(u *unstructured.Unstructured, kubeconfig string) (chan eventapi.JobEvent, error) { + if u.GetDeletionTimestamp() != nil && !r.isFinalizerRun(u) { + return nil, errors.New("resource has been deleted, but no finalizer was matched, skipping reconciliation") + } + ident := strconv.Itoa(rand.Int()) + logger := logrus.WithFields(logrus.Fields{ + "component": "runner", + "job": ident, + "name": u.GetName(), + "namespace": u.GetNamespace(), + }) + // start the event receiver. We'll check errChan for an error after + // ansible-runner exits. + errChan := make(chan error, 1) + receiver, err := eventapi.New(ident, errChan) + if err != nil { + return nil, err + } + inputDir := inputdir.InputDir{ + Path: filepath.Join("/tmp/ansible-operator/runner/", r.GVK.Group, r.GVK.Version, r.GVK.Kind, u.GetNamespace(), u.GetName()), + Parameters: r.makeParameters(u), + EnvVars: map[string]string{ + "K8S_AUTH_KUBECONFIG": kubeconfig, + }, + Settings: map[string]string{ + "runner_http_url": receiver.SocketPath, + "runner_http_path": receiver.URLPath, + }, + } + // If Path is a dir, assume it is a role path. Otherwise assume it's a + // playbook path + fi, err := os.Lstat(r.Path) + if err != nil { + return nil, err + } + if !fi.IsDir() { + inputDir.PlaybookPath = r.Path + } + err = inputDir.Write() + if err != nil { + return nil, err + } + + go func() { + var dc *exec.Cmd + if r.isFinalizerRun(u) { + logger.Debugf("Resource is marked for deletion, running finalizer %s", r.Finalizer.Name) + dc = r.finalizerCmdFunc(ident, inputDir.Path) + } else { + dc = r.cmdFunc(ident, inputDir.Path) + } + + err := dc.Run() + if err != nil { + logger.Errorf("error from ansible-runner: %s", err.Error()) + } else { + logger.Info("ansible-runner exited successfully") + } + + receiver.Close() + err = <-errChan + // http.Server returns this in the case of being closed cleanly + if err != nil && err != http.ErrServerClosed { + logger.Errorf("error from event api: %s", err.Error()) + } + }() + return receiver.Events, nil +} + +func (r *runner) GetFinalizer() (string, bool) { + if r.Finalizer != nil { + return r.Finalizer.Name, true + } + return "", false +} + +func (r *runner) isFinalizerRun(u *unstructured.Unstructured) bool { + finalizersSet := r.Finalizer != nil && u.GetFinalizers() != nil + // The resource is deleted and our finalizer is present, we need to run the finalizer + if finalizersSet && u.GetDeletionTimestamp() != nil { + for _, f := range u.GetFinalizers() { + if f == r.Finalizer.Name { + return true + } + } + } + return false +} + +func (r *runner) addFinalizer(finalizer *Finalizer) error { + r.Finalizer = finalizer + switch { + case finalizer == nil: + return nil + case finalizer.Playbook != "": + if !filepath.IsAbs(finalizer.Playbook) { + return fmt.Errorf("finalizer playbook path must be absolute for %v", r.GVK) + } + r.finalizerCmdFunc = func(ident, inputDirPath string) *exec.Cmd { + return exec.Command("ansible-runner", "-vv", "-p", finalizer.Playbook, "-i", ident, "run", inputDirPath) + } + case finalizer.Role != "": + if !filepath.IsAbs(finalizer.Role) { + return fmt.Errorf("finalizer role path must be absolute for %v", r.GVK) + } + r.finalizerCmdFunc = func(ident, inputDirPath string) *exec.Cmd { + path := strings.TrimRight(finalizer.Role, "/") + rolePath, roleName := filepath.Split(path) + return exec.Command("ansible-runner", "-vv", "--role", roleName, "--roles-path", rolePath, "--hosts", "localhost", "-i", ident, "run", inputDirPath) + } + case len(finalizer.Vars) != 0: + r.finalizerCmdFunc = r.cmdFunc + } + return nil +} + +// makeParameters - creates the extravars parameters for ansible +// The resulting structure in json is: +// { "meta": { +// "name": , +// "namespace": , +// }, +// , +// ... +// __: { +//