Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog/fragments/modify-ansible-flags.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
entries:
- description: >
Add "--max-concurrent-reconciles" flag and "MAX_CONCURRENT_RECONCILES_<Kind>_<Group>" global variable
to ansible binary. Also add deprecate message for "--max-workers" flag and "WORKERS_<Kind>_<Group>"
global variable.
kind: "addition"
20 changes: 18 additions & 2 deletions cmd/ansible-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,13 @@ func printVersion() {

func main() {
f := &flags.Flags{}
f.AddTo(pflag.CommandLine)
err := f.AddTo(pflag.CommandLine)
if err != nil {
log.Error(err, "Failed to add ansible command line flags")
}
pflag.Parse()
logf.SetLogger(zap.Logger())
warningMsgForDeprecatedFlags()

printVersion()

Expand Down Expand Up @@ -127,7 +131,7 @@ func main() {

var gvks []schema.GroupVersionKind
cMap := controllermap.NewControllerMap()
watches, err := watches.Load(f.WatchesFile, f.MaxWorkers, f.AnsibleVerbosity)
watches, err := watches.Load(f.WatchesFile, f.MaxConcurrentReconciles, f.AnsibleVerbosity)
if err != nil {
log.Error(err, "Failed to load watches.")
os.Exit(1)
Expand Down Expand Up @@ -296,3 +300,15 @@ func getAnsibleDebugLog() bool {
}
return val
}

// warningMsgForDeprecatedFlags logs warning messages if deprecated flags are used. Currently,
// "--max-workers" is deprecated. Any value provided using this flags will not be used. Instead
// users are directed to use "--max-concurrent-reconciles".
func warningMsgForDeprecatedFlags() {
if pflag.Lookup("max-workers").Changed {
log.Info("Flag --max-workers has been deprecated, use --max-concurrent-reconciles instead")
if pflag.Lookup("max-concurrent-reconciles").Changed {
log.Info("Ignoring --max-workers since --max-concurrent-reconciles is set")
}
}
}
29 changes: 20 additions & 9 deletions pkg/ansible/flags/flag.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package flags

import (
"fmt"
"runtime"
"time"

Expand All @@ -25,21 +26,21 @@ import (

// Flags - Options to be used by an ansible operator
type Flags struct {
ReconcilePeriod time.Duration
WatchesFile string
InjectOwnerRef bool
MaxWorkers int
AnsibleVerbosity int
AnsibleRolesPath string
AnsibleCollectionsPath string
ReconcilePeriod time.Duration
WatchesFile string
InjectOwnerRef bool
MaxConcurrentReconciles int
MaxWorkers int
AnsibleVerbosity int
AnsibleRolesPath string
AnsibleCollectionsPath string
}

const AnsibleRolesPathEnvVar = "ANSIBLE_ROLES_PATH"
const AnsibleCollectionsPathEnvVar = "ANSIBLE_COLLECTIONS_PATH"

// AddTo - Add the ansible operator flags to the the flagset
// helpTextPrefix will allow you add a prefix to default help text. Joined by a space.
func (f *Flags) AddTo(flagSet *pflag.FlagSet, helpTextPrefix ...string) {
func (f *Flags) AddTo(flagSet *pflag.FlagSet) error {
flagSet.AddFlagSet(zap.FlagSet())
flagSet.DurationVar(&f.ReconcilePeriod,
"reconcile-period",
Expand All @@ -61,6 +62,11 @@ func (f *Flags) AddTo(flagSet *pflag.FlagSet, helpTextPrefix ...string) {
runtime.NumCPU(),
"Maximum number of workers to use. Overridden by environment variable.",
)
flagSet.IntVar(&f.MaxConcurrentReconciles,
"max-concurrent-reconciles",
runtime.NumCPU(),
"Maximum number of concurrent reconciles for controllers. Overridden by environment variable.",
)
flagSet.IntVar(&f.AnsibleVerbosity,
"ansible-verbosity",
2,
Expand All @@ -76,4 +82,9 @@ func (f *Flags) AddTo(flagSet *pflag.FlagSet, helpTextPrefix ...string) {
"",
"Path to installed Ansible Collections. If set, collections should be located in {{value}}/ansible_collections/. If unset, collections are assumed to be in ~/.ansible/collections or /usr/share/ansible/collections.",
)
err := flagSet.MarkDeprecated("max-workers", "use --max-concurrent-reconciles instead.")
if err != nil {
return fmt.Errorf("flag cannot be deprecated %v", err)
}
return nil
}
60 changes: 47 additions & 13 deletions pkg/ansible/watches/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io/ioutil"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -77,8 +78,8 @@ var (
selectorDefault = metav1.LabelSelector{}

// these are overridden by cmdline flags
maxWorkersDefault = 1
ansibleVerbosityDefault = 2
maxConcurrentReconcilesDefault = runtime.NumCPU()
ansibleVerbosityDefault = 2
)

// Creates, populates, and returns a LabelSelector object. Used in Unmarshal().
Expand Down Expand Up @@ -171,7 +172,7 @@ func (w *Watch) setValuesFromAlias(tmp alias) error {
w.Role = tmp.Role
w.Vars = tmp.Vars
w.MaxRunnerArtifacts = tmp.MaxRunnerArtifacts
w.MaxWorkers = getMaxWorkers(gvk, maxWorkersDefault)
w.MaxWorkers = getMaxConcurrentReconciles(gvk, maxConcurrentReconcilesDefault)
w.ReconcilePeriod = tmp.ReconcilePeriod.Duration
w.ManageStatus = *tmp.ManageStatus
w.WatchDependentResources = *tmp.WatchDependentResources
Expand Down Expand Up @@ -297,7 +298,7 @@ func New(gvk schema.GroupVersionKind, role, playbook string, vars map[string]int
Role: role,
Vars: vars,
MaxRunnerArtifacts: maxRunnerArtifactsDefault,
MaxWorkers: maxWorkersDefault,
MaxWorkers: maxConcurrentReconcilesDefault,
ReconcilePeriod: reconcilePeriodDefault.Duration,
ManageStatus: manageStatusDefault,
WatchDependentResources: watchDependentResourcesDefault,
Expand All @@ -309,8 +310,8 @@ func New(gvk schema.GroupVersionKind, role, playbook string, vars map[string]int
}

// Load - loads a slice of Watches from the watches file from the CLI
func Load(path string, maxWorkers, ansibleVerbosity int) ([]Watch, error) {
maxWorkersDefault = maxWorkers
func Load(path string, maxReconciler, ansibleVerbosity int) ([]Watch, error) {
maxConcurrentReconcilesDefault = maxReconciler
ansibleVerbosityDefault = ansibleVerbosity
b, err := ioutil.ReadFile(path)
if err != nil {
Expand Down Expand Up @@ -394,19 +395,23 @@ func verifyAnsiblePath(playbook string, role string) error {
// number of workers based on their cluster resources. While the
// author may use the CLI option to specify a suggested
// configuration for the operator.
func getMaxWorkers(gvk schema.GroupVersionKind, defValue int) int {
envVar := strings.ToUpper(strings.Replace(
func getMaxConcurrentReconciles(gvk schema.GroupVersionKind, defValue int) int {
envVarMaxWorker := strings.ToUpper(strings.ReplaceAll(
fmt.Sprintf("WORKER_%s_%s", gvk.Kind, gvk.Group),
".",
"_",
-1,
))
maxWorkers := getIntegerEnvWithDefault(envVar, defValue)
if maxWorkers <= 0 {
log.Info("Value %v not valid. Using default %v", maxWorkers, defValue)
envVarMaxReconciler := strings.ToUpper(strings.ReplaceAll(
fmt.Sprintf("MAX_CONCURRENT_RECONCILES_%s_%s", gvk.Kind, gvk.Group),
".",
"_",
))
envVal := getIntegerEnvMaxReconcile(envVarMaxWorker, envVarMaxReconciler, defValue)
if envVal <= 0 {
log.Info("Value %v not valid. Using default %v", envVal, defValue)
return defValue
}
return maxWorkers
return envVal
}

// if the ANSIBLE_VERBOSITY_* environment variable is set, use that value.
Expand Down Expand Up @@ -448,3 +453,32 @@ func getIntegerEnvWithDefault(envVar string, defValue int) int {
}
return val
}

// getIntegerEnvMaxReconcile looks for global variable "MAX_CONCURRENT_RECONCILES_<group>_<kind>",
// if not present it checks for "WORKER_<group>_<kind>" and logs deprecation message
// if required. If both of them are not set, we use the default value passed on by command line
// flags.
func getIntegerEnvMaxReconcile(envVarMaxWorker, envVarMaxReconciler string, defValue int) int {
val := defValue
if envValRecon, ok := os.LookupEnv(envVarMaxReconciler); ok {
if i, err := strconv.Atoi(envValRecon); err != nil {
log.Info("Could not parse environment variable as an integer; using default value",
"envVar", envVarMaxReconciler, "default", defValue)
} else {
val = i
}
} else if !ok {
if envValWorker, ok := os.LookupEnv(envVarMaxWorker); ok {
deprecationMsg := fmt.Sprintf("Environment variable %s is deprecated, use %s instead", envVarMaxWorker, envVarMaxReconciler)
log.Info(deprecationMsg)
if i, err := strconv.Atoi(envValWorker); err != nil {
log.Info("Could not parse environment variable as an integer; using default value",
"envVar", envVarMaxWorker, "default", defValue)
} else {
val = i
}
}
}
return val

}
62 changes: 48 additions & 14 deletions pkg/ansible/watches/watches_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func TestNew(t *testing.T) {
t.Fatalf("Unexpected maxRunnerArtifacts %v expected %v", watch.MaxRunnerArtifacts,
maxRunnerArtifactsDefault)
}
if watch.MaxWorkers != maxWorkersDefault {
t.Fatalf("Unexpected maxWorkers %v expected %v", watch.MaxWorkers, maxWorkersDefault)
if watch.MaxWorkers != maxConcurrentReconcilesDefault {
t.Fatalf("Unexpected maxWorkers %v expected %v", watch.MaxWorkers, maxConcurrentReconcilesDefault)
}
if watch.ReconcilePeriod != expectedReconcilePeriod {
t.Fatalf("Unexpected reconcilePeriod %v expected %v", watch.ReconcilePeriod,
Expand Down Expand Up @@ -563,8 +563,7 @@ func TestMaxWorkers(t *testing.T) {
defValue int
expectedValue int
setEnv bool
envKey string
envValue int
envVarMap map[string]int
}{
{
name: "no env, use default value",
Expand All @@ -576,7 +575,9 @@ func TestMaxWorkers(t *testing.T) {
defValue: 1,
expectedValue: 1,
setEnv: false,
envKey: "WORKER_MEMCACHESERVICE_CACHE_EXAMPLE_COM",
envVarMap: map[string]int{
"WORKER_MEMCACHESERVICE_CACHE_EXAMPLE_COM": 0,
},
},
{
name: "invalid env, use default value",
Expand All @@ -588,11 +589,12 @@ func TestMaxWorkers(t *testing.T) {
defValue: 1,
expectedValue: 1,
setEnv: true,
envKey: "WORKER_MEMCACHESERVICE_CACHE_EXAMPLE_COM",
envValue: 0,
envVarMap: map[string]int{
"WORKER_MEMCACHESERVICE_CACHE_EXAMPLE_COM": 0,
},
},
{
name: "env set to 3, expect 3",
name: "worker_%s_%s env set to 3, expect 3",
gvk: schema.GroupVersionKind{
Group: "cache.example.com",
Version: "v1alpha1",
Expand All @@ -601,18 +603,50 @@ func TestMaxWorkers(t *testing.T) {
defValue: 1,
expectedValue: 3,
setEnv: true,
envKey: "WORKER_MEMCACHESERVICE_CACHE_EXAMPLE_COM",
envValue: 3,
envVarMap: map[string]int{
"WORKER_MEMCACHESERVICE_CACHE_EXAMPLE_COM": 3,
},
},
{
name: "max_concurrent_reconciler_%s_%s set to 2, expect 2",
gvk: schema.GroupVersionKind{
Group: "cache.example.com",
Version: "v1alpha1",
Kind: "MemCacheService",
},
defValue: 1,
expectedValue: 2,
setEnv: true,
envVarMap: map[string]int{
"MAX_CONCURRENT_RECONCILES_MEMCACHESERVICE_CACHE_EXAMPLE_COM": 2,
},
},
{
name: "set multiple env variables",
gvk: schema.GroupVersionKind{
Group: "cache.example.com",
Version: "v1alpha1",
Kind: "MemCacheService",
},
defValue: 1,
expectedValue: 3,
setEnv: true,
envVarMap: map[string]int{
"MAX_CONCURRENT_RECONCILES_MEMCACHESERVICE_CACHE_EXAMPLE_COM": 3,
"WORKER_MEMCACHESERVICE_CACHE_EXAMPLE_COM": 1,
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
os.Unsetenv(tc.envKey)
if tc.setEnv {
os.Setenv(tc.envKey, strconv.Itoa(tc.envValue))
for key, val := range tc.envVarMap {
os.Unsetenv(key)
if tc.setEnv {
os.Setenv(key, strconv.Itoa(val))
}
Comment thread
varshaprasad96 marked this conversation as resolved.
}
workers := getMaxWorkers(tc.gvk, tc.defValue)
workers := getMaxConcurrentReconciles(tc.gvk, tc.defValue)
if tc.expectedValue != workers {
t.Fatalf("Unexpected MaxWorkers: %v expected MaxWorkers: %v", workers, tc.expectedValue)
}
Expand Down
24 changes: 14 additions & 10 deletions website/content/en/docs/ansible/reference/advanced_options.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ If you have created resources without owner reference injection, it is
possible to manually to update resources following [this
guide.](../retroactively-owned-resources)

## Max Workers
## Max Concurrent Reconciles

Increasing the number of workers allows events to be processed
Increasing the number of concurrent reconciles allows events to be processed
concurrently, which can improve reconciliation performance.

Worker maximums can be set in two ways. Operator **authors and admins**
can set the max workers default by including extra args to the operator
container in `deploy/operator.yaml`. (Otherwise, the default is 1 worker.)
The maximum number of concurrent reconciles can be set in two ways. Operator **authors and admins**
can set the max concurrent reconciles default by including extra args to the operator
container in `deploy/operator.yaml`. (Otherwise, the default is the maximum number of logical CPUs available for the process obtained using `runtime.NumCPU()`.)

**NOTE:** Admins using OLM should use the environment variable instead
of the extra args.
Expand All @@ -46,12 +46,14 @@ of the extra args.
image: "quay.io/asmacdo/memcached-operator:v0.0.0"
imagePullPolicy: "Always"
args:
- "--max-workers"
- "--max-concurrent-reconciles"
- "3"
```
**Note**
Previously, `--max-workers` was used and is replaced by `--max-concurrent-reconciles`.

Operator **admins** can override the value by setting an environment
variable in the format `WORKER_<kind>_<group>`. This variable must be
variable in the format `MAX_CONCURRENT_RECONCILES_<kind>_<group>`. This variable must be
all uppercase, and periods (e.g. in the group name) are replaced with underscores.

For the memcached operator example, the component parts are retrieved
Expand All @@ -68,7 +70,7 @@ metadata:
```

From this data, we can see that the environment variable will be
`WORKER_MEMCACHED_CACHE_EXAMPLE_COM`, which we can then add to
`MAX_CONCURRENT_RECONCILES_MEMCACHED_CACHE_EXAMPLE_COM`, which we can then add to
`deploy/operator.yaml`:

``` yaml
Expand All @@ -77,13 +79,15 @@ From this data, we can see that the environment variable will be
imagePullPolicy: "Always"
args:
# This default is overridden.
- "--max-workers"
- "--max-reconciles"
- "3"
env:
# This value is used
- name: WORKER_MEMCACHED_CACHE_EXAMPLE_COM
- name: MAX_CONCURRENT_RECONCILES_MEMCACHED_CACHE_EXAMPLE_COM
value: "6"
```
**Note**
Previously, the naming convention for global variable was `WORKERS_%s_%s`. It has been updated to `MAX_CONCURRENT_RECONCILES_%s_%s`. Currently, though we accept inputs to both the variables, `MAX_CONCURRENT_RECONCILES_%s_%s` takes precedence over the formerly used one.

## Ansible Verbosity

Expand Down