-
Notifications
You must be signed in to change notification settings - Fork 1.8k
pkg/ansible: Adding ansible operator runner package #472
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
shawn-hurley
merged 1 commit into
operator-framework:master
from
shawn-hurley:feature/add-AO-runner
Sep 19, 2018
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,184 @@ | ||
| // Copyright 2018 The Operator-SDK Authors | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package eventapi | ||
|
|
||
| import ( | ||
| "encoding/json" | ||
| "fmt" | ||
| "io" | ||
| "io/ioutil" | ||
| "net" | ||
| "net/http" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
||
| "github.com/sirupsen/logrus" | ||
| ) | ||
|
|
||
| // EventReceiver serves the event API | ||
| type EventReceiver struct { | ||
| // Events is the channel used by the event API handler to send JobEvents | ||
| // back to the runner, or whatever code is using this receiver. | ||
| Events chan JobEvent | ||
|
|
||
| // SocketPath is the path on the filesystem to a unix streaming socket | ||
| SocketPath string | ||
|
|
||
| // URLPath is the path portion of the url at which events should be | ||
| // received. For example, "/events/" | ||
| URLPath string | ||
|
|
||
| // server is the http.Server instance that serves the event API. It must be | ||
| // closed. | ||
| server io.Closer | ||
|
|
||
| // stopped indicates if this receiver has permanently stopped receiving | ||
| // events. When true, requests to POST an event will receive a "410 Gone" | ||
| // response, and the body will be ignored. | ||
| stopped bool | ||
|
|
||
| // mutex controls access to the "stopped" bool above, ensuring that writes | ||
| // are goroutine-safe. | ||
| mutex sync.RWMutex | ||
|
|
||
| // ident is the unique identifier for a particular run of ansible-runner | ||
| ident string | ||
|
|
||
| // logger holds a logger that has some fields already set | ||
| logger logrus.FieldLogger | ||
| } | ||
|
|
||
| func New(ident string, errChan chan<- error) (*EventReceiver, error) { | ||
| sockPath := fmt.Sprintf("/tmp/ansibleoperator-%s", ident) | ||
| listener, err := net.Listen("unix", sockPath) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| rec := EventReceiver{ | ||
| Events: make(chan JobEvent, 1000), | ||
| SocketPath: sockPath, | ||
| URLPath: "/events/", | ||
| ident: ident, | ||
| logger: logrus.WithFields(logrus.Fields{ | ||
| "component": "eventapi", | ||
| "job": ident, | ||
| }), | ||
| } | ||
|
|
||
| mux := http.NewServeMux() | ||
| mux.HandleFunc(rec.URLPath, rec.handleEvents) | ||
| srv := http.Server{Handler: mux} | ||
| rec.server = &srv | ||
|
|
||
| go func() { | ||
| errChan <- srv.Serve(listener) | ||
| }() | ||
| return &rec, nil | ||
| } | ||
|
|
||
| // Close ensures that appropriate resources are cleaned up, such as any unix | ||
| // streaming socket that may be in use. Close must be called. | ||
| func (e *EventReceiver) Close() { | ||
| e.mutex.Lock() | ||
| e.stopped = true | ||
| e.mutex.Unlock() | ||
| e.logger.Debug("event API stopped") | ||
| e.server.Close() | ||
| close(e.Events) | ||
| } | ||
|
|
||
| func (e *EventReceiver) handleEvents(w http.ResponseWriter, r *http.Request) { | ||
| if r.URL.Path != e.URLPath { | ||
| http.NotFound(w, r) | ||
| e.logger.WithFields(logrus.Fields{ | ||
| "code": "404", | ||
| }).Infof("path not found: %s\n", r.URL.Path) | ||
| return | ||
| } | ||
|
|
||
| if r.Method != http.MethodPost { | ||
| e.logger.WithFields(logrus.Fields{ | ||
| "code": "405", | ||
| }).Infof("method %s not allowed", r.Method) | ||
| w.WriteHeader(http.StatusMethodNotAllowed) | ||
| return | ||
| } | ||
|
|
||
| ct := r.Header.Get("content-type") | ||
| if strings.Split(ct, ";")[0] != "application/json" { | ||
| e.logger.WithFields(logrus.Fields{ | ||
| "code": "415", | ||
| }).Info("wrong content type: %s", ct) | ||
| w.WriteHeader(http.StatusUnsupportedMediaType) | ||
| w.Write([]byte("The content-type must be \"application/json\"")) | ||
| return | ||
| } | ||
|
|
||
| body, err := ioutil.ReadAll(r.Body) | ||
| if err != nil { | ||
| e.logger.WithFields(logrus.Fields{ | ||
| "code": "500", | ||
| }).Errorf("%s", err.Error()) | ||
| w.WriteHeader(http.StatusInternalServerError) | ||
| return | ||
| } | ||
|
|
||
| event := JobEvent{} | ||
| err = json.Unmarshal(body, &event) | ||
| if err != nil { | ||
| e.logger.WithFields(logrus.Fields{ | ||
| "code": "400", | ||
| }).Infof("could not deserialize body: %s", err.Error()) | ||
| w.WriteHeader(http.StatusBadRequest) | ||
| w.Write([]byte("Could not deserialize body as JSON")) | ||
| return | ||
| } | ||
|
|
||
| // Guarantee that the Events channel will not be written to if stopped == | ||
| // true, because in that case the channel has been closed. | ||
| e.mutex.RLock() | ||
| defer e.mutex.RUnlock() | ||
| if e.stopped { | ||
| e.mutex.RUnlock() | ||
| w.WriteHeader(http.StatusGone) | ||
| e.logger.WithFields(logrus.Fields{ | ||
| "code": "410", | ||
| }).Info("stopped and not accepting additional events for this job") | ||
| return | ||
| } | ||
| // ansible-runner sends "status events" and "ansible events". The "status | ||
| // events" signify a change in the state of ansible-runner itself, which | ||
| // we're not currently interested in. | ||
| // https://ansible-runner.readthedocs.io/en/latest/external_interface.html#event-structure | ||
| if event.UUID == "" { | ||
| e.logger.Info("dropping event that is not a JobEvent") | ||
| } else { | ||
| // timeout if the channel blocks for too long | ||
| timeout := time.NewTimer(10 * time.Second) | ||
| select { | ||
| case e.Events <- event: | ||
| case <-timeout.C: | ||
| e.logger.WithFields(logrus.Fields{ | ||
| "code": "500", | ||
| }).Warn("timed out writing event to channel") | ||
| w.WriteHeader(http.StatusInternalServerError) | ||
| return | ||
| } | ||
| _ = timeout.Stop() | ||
| } | ||
| w.WriteHeader(http.StatusNoContent) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| // Copyright 2018 The Operator-SDK Authors | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package eventapi | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "strings" | ||
| "time" | ||
| ) | ||
|
|
||
| // EventTime - time to unmarshal nano time. | ||
| type EventTime struct { | ||
| time.Time | ||
| } | ||
|
|
||
| // UnmarshalJSON - override unmarshal json. | ||
| func (e *EventTime) UnmarshalJSON(b []byte) (err error) { | ||
| e.Time, err = time.Parse("2006-01-02T15:04:05.999999999", strings.Trim(string(b[:]), "\"\\")) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // MarshalJSON - override the marshal json. | ||
| func (e EventTime) MarshalJSON() ([]byte, error) { | ||
| return []byte(fmt.Sprintf("\"%s\"", e.Time.Format("2006-01-02T15:04:05.99999999"))), nil | ||
| } | ||
|
|
||
| // JobEvent - event of an ansible run. | ||
| type JobEvent struct { | ||
| UUID string `json:"uuid"` | ||
| Counter int `json:"counter"` | ||
| StdOut string `json:"stdout"` | ||
| StartLine int `json:"start_line"` | ||
| EndLine int `json:"EndLine"` | ||
| Event string `json:"event"` | ||
| EventData map[string]interface{} `json:"event_data"` | ||
| PID int `json:"pid"` | ||
| Created EventTime `json:"created"` | ||
| } | ||
|
|
||
| // StatusJobEvent - event of an ansible run. | ||
| type StatusJobEvent struct { | ||
| UUID string `json:"uuid"` | ||
| Counter int `json:"counter"` | ||
| StdOut string `json:"stdout"` | ||
| StartLine int `json:"start_line"` | ||
| EndLine int `json:"EndLine"` | ||
| Event string `json:"event"` | ||
| EventData StatsEventData `json:"event_data"` | ||
| PID int `json:"pid"` | ||
| Created EventTime `json:"created"` | ||
| } | ||
|
|
||
| // StatsEventData - data for a the status event. | ||
| type StatsEventData struct { | ||
| Playbook string `json:"playbook"` | ||
| PlaybookUUID string `json:"playbook_uuid"` | ||
| Changed map[string]int `json:"changed"` | ||
| Ok map[string]int `json:"ok"` | ||
| Failures map[string]int `json:"failures"` | ||
| Skipped map[string]int `json:"skipped"` | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,123 @@ | ||
| // Copyright 2018 The Operator-SDK Authors | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| package inputdir | ||
|
|
||
| import ( | ||
| "encoding/json" | ||
| "fmt" | ||
| "io/ioutil" | ||
| "os" | ||
| "path/filepath" | ||
|
|
||
| "github.com/sirupsen/logrus" | ||
| ) | ||
|
|
||
| // InputDir represents an input directory for ansible-runner. | ||
| type InputDir struct { | ||
| Path string | ||
| PlaybookPath string | ||
| Parameters map[string]interface{} | ||
| EnvVars map[string]string | ||
| Settings map[string]string | ||
| } | ||
|
|
||
| // makeDirs creates the required directory structure. | ||
| func (i *InputDir) makeDirs() error { | ||
| for _, path := range []string{"env", "project", "inventory"} { | ||
| fullPath := filepath.Join(i.Path, path) | ||
| err := os.MkdirAll(fullPath, os.ModePerm) | ||
| if err != nil { | ||
| logrus.Errorf("unable to create directory %v", fullPath) | ||
| return err | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // addFile adds a file to the given relative path within the input directory. | ||
| func (i *InputDir) addFile(path string, content []byte) error { | ||
| fullPath := filepath.Join(i.Path, path) | ||
| err := ioutil.WriteFile(fullPath, content, 0644) | ||
| if err != nil { | ||
| logrus.Errorf("unable to write file %v", fullPath) | ||
| } | ||
| return err | ||
| } | ||
|
|
||
| // Write commits the object's state to the filesystem at i.Path. | ||
| func (i *InputDir) Write() error { | ||
| paramBytes, err := json.Marshal(i.Parameters) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| envVarBytes, err := json.Marshal(i.EnvVars) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| settingsBytes, err := json.Marshal(i.Settings) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| err = i.makeDirs() | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| err = i.addFile("env/envvars", envVarBytes) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| err = i.addFile("env/extravars", paramBytes) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| err = i.addFile("env/settings", settingsBytes) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // If ansible-runner is running in a python virtual environment, propagate | ||
| // that to ansible. | ||
| venv := os.Getenv("VIRTUAL_ENV") | ||
| hosts := "localhost ansible_connection=local" | ||
| if venv != "" { | ||
| hosts = fmt.Sprintf("%s ansible_python_interpreter=%s", hosts, filepath.Join(venv, "bin/python")) | ||
| } | ||
| err = i.addFile("inventory/hosts", []byte(hosts)) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| if i.PlaybookPath != "" { | ||
| f, err := os.Open(i.PlaybookPath) | ||
| if err != nil { | ||
| logrus.Errorf("failed to open playbook file %v", i.PlaybookPath) | ||
| return err | ||
| } | ||
| defer f.Close() | ||
|
|
||
| playbookBytes, err := ioutil.ReadAll(f) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
||
| err = i.addFile("project/playbook.yaml", playbookBytes) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we change to
I think the code has less nested if statement which makes the code cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be a little cleaner but
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fair enough. let's keep as it is.