Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 2 additions & 17 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 0 additions & 7 deletions test/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ limitations under the License.
package test

import (
sources "github.com/knative/eventing-sources/pkg/client/clientset/versioned"
eventing "github.com/knative/eventing/pkg/client/clientset/versioned"
"github.com/knative/pkg/test"
serving "github.com/knative/serving/pkg/client/clientset/versioned"
Expand All @@ -33,7 +32,6 @@ type Clients struct {
Serving *serving.Clientset
Eventing *eventing.Clientset
Dynamic dynamic.Interface
Sources *sources.Clientset
}

// NewClients instantiates and returns several clientsets required for making request to the
Expand Down Expand Up @@ -65,11 +63,6 @@ func NewClients(configPath string, clusterName string, namespace string) (*Clien
return nil, err
}

clients.Sources, err = sources.NewForConfig(cfg)
if err != nil {
return nil, err
}

return clients, nil
}

Expand Down
123 changes: 94 additions & 29 deletions test/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package test
// crd contains functions that construct boilerplate CRD definitions.

import (
sourcesv1alpha1 "github.com/knative/eventing-sources/pkg/apis/sources/v1alpha1"
"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
servingv1alpha1 "github.com/knative/serving/pkg/apis/serving/v1alpha1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)

// Route returns a Route object in namespace
Expand Down Expand Up @@ -129,21 +129,6 @@ func Channel(name string, namespace string, provisioner *corev1.ObjectReference)
}
}

// KubernetesEventSource returns a KubernetesEventSource sinking to specified channel
func KubernetesEventSource(name string, namespace string, targetNamespace string, serviceAccount string, channel *corev1.ObjectReference) *sourcesv1alpha1.KubernetesEventSource {
return &sourcesv1alpha1.KubernetesEventSource{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: sourcesv1alpha1.KubernetesEventSourceSpec{
Namespace: targetNamespace,
ServiceAccountName: serviceAccount,
Sink: channel,
},
}
}

// SubscriberSpecForRoute returns a SubscriberSpec for a given Knative Service.
func SubscriberSpecForRoute(name string) *v1alpha1.SubscriberSpec {
return &v1alpha1.SubscriberSpec{
Expand All @@ -155,6 +140,17 @@ func SubscriberSpecForRoute(name string) *v1alpha1.SubscriberSpec {
}
}

// SubscriberSpecForService returns a SubscriberSpec for a given Knative Service.
func SubscriberSpecForService(name string) *v1alpha1.SubscriberSpec {
return &v1alpha1.SubscriberSpec{
Ref: &corev1.ObjectReference{
Kind: "Service",
APIVersion: "v1",
Name: name,
},
}
}

// Subscription returns a Subscription
func Subscription(name string, namespace string, channel *corev1.ObjectReference, subscriber *v1alpha1.SubscriberSpec, reply *v1alpha1.ReplyStrategy) *v1alpha1.Subscription {
return &v1alpha1.Subscription{
Expand All @@ -170,26 +166,95 @@ func Subscription(name string, namespace string, channel *corev1.ObjectReference
}
}

// NGinxPod returns nginx pod defined in given namespace
func NGinxPod(namespace string) *corev1.Pod {
// CloudEvent specifices the arguments for a CloudEvent sent by the sendevent
// binary.
type CloudEvent struct {
ID string
Type string
Source string
Data string
Encoding string // binary or structured
}

const (
CloudEventEncodingBinary = "binary"
CloudEventEncodingStructured = "structured"
)

// EventSenderPod creates a Pod that sends a single event to the given address.
func EventSenderPod(name string, namespace string, sink string, event CloudEvent) *corev1.Pod {
if event.Encoding == "" {
event.Encoding = CloudEventEncodingBinary
}

return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "nginx",
Name: name,
Namespace: namespace,
Annotations: map[string]string{"sidecar.istio.io/inject": "true"},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx:1.7.9",
Ports: []corev1.ContainerPort{
{
ContainerPort: 80,
},
},
Containers: []corev1.Container{{
Name: "sendevent",
Image: ImagePath("sendevent"),
ImagePullPolicy: corev1.PullAlways, // TODO: this might not be wanted for local.
Args: []string{
"-event-id",
event.ID,
"-event-type",
event.Type,
"-source",
event.Source,
"-data",
event.Data,
"-encoding",
event.Encoding,
"-sink",
sink,
},
},
}},
//TODO restart on failure?
RestartPolicy: corev1.RestartPolicyNever,
},
}
}

// EventLoggerPod creates a Pod that logs events received.
func EventLoggerPod(name string, namespace string, selector map[string]string) *corev1.Pod {
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: selector,
Annotations: map[string]string{"sidecar.istio.io/inject": "true"},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "logevents",
Image: ImagePath("logevents"),
ImagePullPolicy: corev1.PullAlways, // TODO: this might not be wanted for local.
}},
RestartPolicy: corev1.RestartPolicyAlways,
},
}
}

// Service creates a Kubernetes Service with the given name, namespace, and
// selector. Port 8080 is assumed the target port.
func Service(name string, namespace string, selector map[string]string) *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Spec: corev1.ServiceSpec{
Selector: selector,
Ports: []corev1.ServicePort{{
Name: "http",
Port: 80,
Protocol: corev1.ProtocolTCP,
TargetPort: intstr.FromInt(8080),
}},
},
}
}
47 changes: 43 additions & 4 deletions test/crd_checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,25 @@ import (
"fmt"
"time"

servingV1alpha1 "github.com/knative/serving/pkg/apis/serving/v1alpha1"
servingtyped "github.com/knative/serving/pkg/client/clientset/versioned/typed/serving/v1alpha1"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
eventingclient "github.com/knative/eventing/pkg/client/clientset/versioned/typed/eventing/v1alpha1"
servingv1alpha1 "github.com/knative/serving/pkg/apis/serving/v1alpha1"
servingclient "github.com/knative/serving/pkg/client/clientset/versioned/typed/serving/v1alpha1"
"go.opencensus.io/trace"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
)

const (
interval = 1 * time.Second
timeout = 5 * time.Minute
timeout = 1 * time.Minute
)

// WaitForRouteState polls the status of the Route called name from client every
// interval until inState returns `true` indicating it is done, returns an
// error or timeout. desc will be used to name the metric that is emitted to
// track how long it took for name to get into the state checked by inState.
func WaitForRouteState(client servingtyped.RouteInterface, name string, inState func(r *servingV1alpha1.Route) (bool, error), desc string) error {
func WaitForRouteState(client servingclient.RouteInterface, name string, inState func(r *servingv1alpha1.Route) (bool, error), desc string) error {
metricName := fmt.Sprintf("WaitForRouteState/%s/%s", name, desc)
_, span := trace.StartSpan(context.Background(), metricName)
defer span.End()
Expand All @@ -52,3 +54,40 @@ func WaitForRouteState(client servingtyped.RouteInterface, name string, inState
return inState(r)
})
}

// WaitForChannelState polls the status of the Channel called name from client
// every interval until inState returns `true` indicating it is done, returns an
// error or timeout. desc will be used to name the metric that is emitted to
// track how long it took for name to get into the state checked by inState.
func WaitForChannelState(client eventingclient.ChannelInterface, name string, inState func(r *eventingv1alpha1.Channel) (bool, error), desc string) error {
metricName := fmt.Sprintf("WaitForChannelState/%s/%s", name, desc)
_, span := trace.StartSpan(context.Background(), metricName)
defer span.End()

return wait.PollImmediate(interval, timeout, func() (bool, error) {
r, err := client.Get(name, metav1.GetOptions{})
if err != nil {
return true, err
}
return inState(r)
})
}

// WaitForSubscriptionState polls the status of the Subscription called name
// from client every interval until inState returns `true` indicating it is
// done, returns an error or timeout. desc will be used to name the metric that
// is emitted to track how long it took for name to get into the state checked
// by inState.
func WaitForSubscriptionState(client eventingclient.SubscriptionInterface, name string, inState func(r *eventingv1alpha1.Subscription) (bool, error), desc string) error {
metricName := fmt.Sprintf("WaitForSubscriptionState/%s/%s", name, desc)
_, span := trace.StartSpan(context.Background(), metricName)
defer span.End()

return wait.PollImmediate(interval, timeout, func() (bool, error) {
r, err := client.Get(name, metav1.GetOptions{})
if err != nil {
return true, err
}
return inState(r)
})
}
35 changes: 5 additions & 30 deletions test/e2e-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# This script runs the end-to-end tests against the eventing
# built from source.
# This script runs the end-to-end tests against eventing built from source.

# If you already have the *_OVERRIDE environment variables set, call
# this script with the --run-tests arguments and it will use the cluster
Expand All @@ -27,49 +26,28 @@

source $(dirname $0)/../vendor/github.com/knative/test-infra/scripts/e2e-tests.sh

# Using the most recent good release of eventing-sources to unblock tests. This
# should be replaced with the commented line below when eventing-sources nightly
# is known good again.
readonly KNATIVE_EVENTING_SOURCES_RELEASE=https://knative-nightly.storage.googleapis.com/eventing-sources/previous/v20181205-fbac942/release.yaml
#readonly KNATIVE_EVENTING_SOURCES_RELEASE=https://knative-nightly.storage.googleapis.com/eventing-sources/latest/release.yaml

# Names of the Resources used in the tests.
readonly E2E_TEST_NAMESPACE=e2etest
readonly E2E_TEST_FUNCTION_NAMESPACE=e2etestfn3
# Currently this namespace must be the same as the namespace specified in
# test/e2e/e2e.go.
readonly E2E_TEST_NAMESPACE=e2etest-knative-eventing

# Helper functions.

# Install the latest stable Knative/serving in the current cluster.
function start_latest_eventing_sources() {
subheader "Installing Knative Eventing Sources"
kubectl apply -f ${KNATIVE_EVENTING_SOURCES_RELEASE} || return 1
wait_until_pods_running knative-sources || return 1
}


function teardown() {
teardown_events_test_resources
ko delete --ignore-not-found=true -f config/
ko delete --ignore-not-found=true -f ${KNATIVE_EVENTING_SOURCES_RELEASE}

wait_until_object_does_not_exist namespaces knative-eventing
wait_until_object_does_not_exist namespaces knative-sources

wait_until_object_does_not_exist customresourcedefinitions subscriptions.eventing.knative.dev
wait_until_object_does_not_exist customresourcedefinitions channels.eventing.knative.dev
}

function setup_events_test_resources() {
kubectl create namespace ${E2E_TEST_NAMESPACE} || return 1
kubectl create namespace ${E2E_TEST_FUNCTION_NAMESPACE}
kubectl create namespace ${E2E_TEST_NAMESPACE}
}

function teardown_events_test_resources() {
# Delete the function namespace
echo "Deleting namespace ${E2E_TEST_FUNCTION_NAMESPACE}"
kubectl --ignore-not-found=true delete namespace ${E2E_TEST_FUNCTION_NAMESPACE}
wait_until_object_does_not_exist namespaces ${E2E_TEST_FUNCTION_NAMESPACE} || return 1

# Delete the test namespace
echo "Deleting namespace $E2E_TEST_NAMESPACE"
kubectl --ignore-not-found=true delete namespace ${E2E_TEST_NAMESPACE}
Expand All @@ -91,9 +69,6 @@ if (( ! USING_EXISTING_CLUSTER )); then
start_latest_knative_serving || fail_test "Serving did not come up"
fi

# Install Knative Eventing Sources
start_latest_eventing_sources || fail_test "Eventing Sources did not come up"

# Clean up anything that might still be around
teardown_events_test_resources || fail_test "Error cleaning up test resources"

Expand Down
Loading