Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions test/upgrade/probe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ func TestContinuousEventsPropagationWithProber(t *testing.T) {

config := prober.NewConfig(client.Namespace)

// FIXME: https://github.com/knative/eventing/issues/2665
config.FailOnErrors = false

// Use zap.SugarLogger instead of t.Logf because we want to see failures
// inline with other logs instead of buffered until the end.
log := createLogger()
Expand Down
2 changes: 1 addition & 1 deletion test/upgrade/prober/config.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# logLevel = 5 # DEBUG(5)
[sender]
address = '{{- .BrokerUrl -}}'
address = '{{- .BrokerURL -}}'
interval = {{ .Config.Interval.Nanoseconds }}
[forwarder]
target = 'http://wathola-receiver.{{- .Config.Namespace -}}.svc.cluster.local'
126 changes: 101 additions & 25 deletions test/upgrade/prober/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"path"
"runtime"
"text/template"
"time"

"github.com/kelseyhightower/envconfig"
"github.com/wavesoftware/go-ensure"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1"
Expand All @@ -34,17 +36,82 @@ import (
)

const (
configName = "wathola-config"
configMountPoint = "/home/nonroot/.config/wathola"
configFilename = "config.toml"
watholaEventNs = "com.github.cardil.wathola"
healthEndpoint = "/healthz"
defaultConfigName = "wathola-config"
defaultConfigHomedirPath = ".config/wathola"
defaultHomedir = "/home/nonroot"
defaultConfigFilename = "config.toml"
defaultWatholaEventsPrefix = "com.github.cardil.wathola"
defaultBrokerName = "default"
defaultHealthEndpoint = "/healthz"
defaultFinishedSleep = 5 * time.Second
)

var (
eventTypes = []string{"step", "finished"}
brokerName = "default"
)
var eventTypes = []string{"step", "finished"}

// Config represents a configuration for prober.
type Config struct {
Wathola
Namespace string
Interval time.Duration
FinishedSleep time.Duration
Serving ServingConfig
FailOnErrors bool
}

// Wathola represents options related strictly to wathola testing tool.
type Wathola struct {
ConfigMap
EventsTypePrefix string
HealthEndpoint string
BrokerName string
}

// ConfigMap represents options of wathola config toml file.
type ConfigMap struct {
ConfigMapName string
ConfigMountPoint string
ConfigFilename string
}

// ServingConfig represents a options for serving test component (wathola-forwarder).
type ServingConfig struct {
Use bool
ScaleToZero bool
}

// NewConfig creates a new configuration object with default values filled in.
// Values can be influenced by kelseyhightower/envconfig with
// `e2e_upgrade_tests` prefix.
func NewConfig(namespace string) *Config {
config := &Config{
Namespace: "",
Interval: Interval,
FinishedSleep: defaultFinishedSleep,
FailOnErrors: true,
Serving: ServingConfig{
Use: false,
ScaleToZero: true,
},
Wathola: Wathola{
ConfigMap: ConfigMap{
ConfigMapName: defaultConfigName,
ConfigMountPoint: fmt.Sprintf("%s/%s", defaultHomedir, defaultConfigHomedirPath),
ConfigFilename: defaultConfigFilename,
},
EventsTypePrefix: defaultWatholaEventsPrefix,
HealthEndpoint: defaultHealthEndpoint,
BrokerName: defaultBrokerName,
},
}

// FIXME: remove while fixing https://github.com/knative/eventing/issues/2665
config.FailOnErrors = false

err := envconfig.Process("e2e_upgrade_tests", config)
ensure.NoError(err)
config.Namespace = namespace
return config
}

func (p *prober) deployConfiguration() {
p.deployBroker()
Expand All @@ -53,55 +120,64 @@ func (p *prober) deployConfiguration() {
}

func (p *prober) deployBroker() {
p.client.CreateBrokerV1Beta1OrFail(brokerName)
p.client.CreateBrokerV1Beta1OrFail(p.config.BrokerName)
}

func (p *prober) fetchBrokerUrl() (*apis.URL, error) {
func (p *prober) fetchBrokerURL() (*apis.URL, error) {
namespace := p.config.Namespace
p.log.Debugf("Fetching %s broker URL for ns %s", brokerName, namespace)
meta := resources.NewMetaResource(brokerName, p.config.Namespace, testlib.BrokerTypeMeta)
p.log.Debugf("Fetching %s broker URL for ns %s",
p.config.BrokerName, namespace)
meta := resources.NewMetaResource(
p.config.BrokerName, p.config.Namespace, testlib.BrokerTypeMeta,
)
err := duck.WaitForResourceReady(p.client.Dynamic, meta)
if err != nil {
return nil, err
}
broker, err := p.client.Eventing.EventingV1beta1().Brokers(namespace).Get(context.Background(), brokerName, metav1.GetOptions{})
broker, err := p.client.Eventing.EventingV1beta1().Brokers(namespace).Get(
context.Background(), p.config.BrokerName, metav1.GetOptions{},
)
if err != nil {
return nil, err
}
url := broker.Status.Address.URL
p.log.Debugf("%s broker URL for ns %s is %v", brokerName, namespace, url)
p.log.Debugf("%s broker URL for ns %s is %v",
p.config.BrokerName, namespace, url)
return url, nil
}

func (p *prober) deployConfigMap() {
name := configName
name := p.config.ConfigMapName
p.log.Infof("Deploying config map: \"%s/%s\"", p.config.Namespace, name)
brokerUrl, err := p.fetchBrokerUrl()
brokerURL, err := p.fetchBrokerURL()
ensure.NoError(err)
configData := p.compileTemplate(configFilename, brokerUrl)
p.client.CreateConfigMapOrFail(name, p.config.Namespace, map[string]string{configFilename: configData})
configData := p.compileTemplate(p.config.ConfigFilename, brokerURL)
p.client.CreateConfigMapOrFail(name, p.config.Namespace, map[string]string{
p.config.ConfigFilename: configData,
})
}

func (p *prober) deployTriggers() {
for _, eventType := range eventTypes {
name := fmt.Sprintf("wathola-trigger-%v", eventType)
fullType := fmt.Sprintf("%v.%v", watholaEventNs, eventType)
fullType := fmt.Sprintf("%v.%v", p.config.EventsTypePrefix, eventType)
subscriberOption := resources.WithSubscriberServiceRefForTriggerV1Beta1(receiverName)
if p.config.Serving.Use {
subscriberOption = resources.WithSubscriberKServiceRefForTrigger(forwarderName)
}
p.client.CreateTriggerOrFailV1Beta1(name,
resources.WithBrokerV1Beta1(brokerName),
resources.WithBrokerV1Beta1(p.config.BrokerName),
resources.WithAttributesTriggerFilterV1Beta1(
eventingv1beta1.TriggerAnyFilter,
fullType,
map[string]interface{}{},
),
subscriberOption)
subscriberOption,
)
}
}

func (p *prober) compileTemplate(templateName string, brokerUrl *apis.URL) string {
func (p *prober) compileTemplate(templateName string, brokerURL fmt.Stringer) string {
_, filename, _, _ := runtime.Caller(0)
templateFilepath := path.Join(path.Dir(filename), templateName)
templateBytes, err := ioutil.ReadFile(templateFilepath)
Expand All @@ -111,10 +187,10 @@ func (p *prober) compileTemplate(templateName string, brokerUrl *apis.URL) strin
var buff bytes.Buffer
data := struct {
*Config
BrokerUrl string
BrokerURL string
}{
p.config,
brokerUrl.String(),
brokerURL.String(),
}
ensure.NoError(tmpl.Execute(&buff, data))
return buff.String()
Expand Down
14 changes: 7 additions & 7 deletions test/upgrade/prober/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var (
func (p *prober) deployForwarder(ctx context.Context) {
p.log.Infof("Deploy forwarder knative service: %v", forwarderName)
serving := p.client.Dynamic.Resource(resources.KServicesGVR).Namespace(p.client.Namespace)
service := forwarderKService(forwarderName, p.client.Namespace)
service := p.forwarderKService(forwarderName, p.client.Namespace)
_, err := serving.Create(context.Background(), service, metav1.CreateOptions{})
ensure.NoError(err)

Expand All @@ -60,7 +60,7 @@ func (p *prober) removeForwarder() {
ensure.NoError(err)
}

func forwarderKService(name, namespace string) *unstructured.Unstructured {
func (p *prober) forwarderKService(name, namespace string) *unstructured.Unstructured {
obj := map[string]interface{}{
"apiVersion": resources.KServiceType.APIVersion,
"kind": resources.KServiceType.Kind,
Expand All @@ -78,20 +78,20 @@ func forwarderKService(name, namespace string) *unstructured.Unstructured {
"name": "forwarder",
"image": pkgTest.ImagePath(forwarderName),
"volumeMounts": []map[string]interface{}{{
"name": configName,
"mountPath": configMountPoint,
"name": p.config.ConfigMapName,
"mountPath": p.config.ConfigMountPoint,
"readOnly": true,
}},
"readinessProbe": map[string]interface{}{
"httpGet": map[string]interface{}{
"path": healthEndpoint,
"path": p.config.HealthEndpoint,
},
},
}},
"volumes": []map[string]interface{}{{
"name": configName,
"name": p.config.ConfigMapName,
"configMap": map[string]interface{}{
"name": configName,
"name": p.config.ConfigMapName,
},
}},
},
Expand Down
32 changes: 0 additions & 32 deletions test/upgrade/prober/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"testing"
"time"

"github.com/kelseyhightower/envconfig"
"github.com/wavesoftware/go-ensure"
"go.uber.org/zap"
testlib "knative.dev/eventing/test/lib"
Expand Down Expand Up @@ -50,37 +49,6 @@ type Prober interface {
remove()
}

// Config represents a configuration for prober
type Config struct {
Namespace string
Interval time.Duration
Serving ServingConfig
FinishedSleep time.Duration
FailOnErrors bool
}

type ServingConfig struct {
Use bool
ScaleToZero bool
}

func NewConfig(namespace string) *Config {
config := &Config{
Namespace: "",
Interval: Interval,
FinishedSleep: 5 * time.Second,
FailOnErrors: true,
Serving: ServingConfig{
Use: false,
ScaleToZero: true,
},
}
err := envconfig.Process("e2e_upgrade_tests", config)
ensure.NoError(err)
config.Namespace = namespace
return config
}

// RunEventProber starts a single Prober of the given domain.
func RunEventProber(ctx context.Context, log *zap.SugaredLogger, client *testlib.Client, config *Config) Prober {
pm := newProber(log, client, config)
Expand Down
Loading