Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pkg/reconciler/v1alpha1/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"fmt"
"testing"

"sigs.k8s.io/controller-runtime/pkg/client/fake"

eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
controllertesting "github.com/knative/eventing/pkg/reconciler/testing"
Expand All @@ -33,6 +31,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Comment thread
Harwayne marked this conversation as resolved.
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

Expand Down
111 changes: 9 additions & 102 deletions pkg/reconciler/v1alpha1/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@ package subscription
import (
"context"
"fmt"
"net/url"

eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1"
"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/logging"
"github.com/knative/eventing/pkg/reconciler/names"
duckapis "github.com/knative/pkg/apis"
"github.com/knative/eventing/pkg/utils/resolve"
"github.com/knative/pkg/apis/duck"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
"go.uber.org/zap"
Expand Down Expand Up @@ -87,7 +85,7 @@ func ProvideController(mgr manager.Manager, logger *zap.Logger) (controller.Cont
}

// Watch Subscription events and enqueue Subscription object key.
if err := c.Watch(&source.Kind{Type: &v1alpha1.Subscription{}}, &handler.EnqueueRequestForObject{}); err != nil {
if err = c.Watch(&source.Kind{Type: &v1alpha1.Subscription{}}, &handler.EnqueueRequestForObject{}); err != nil {
return nil, err
}

Expand Down Expand Up @@ -153,16 +151,15 @@ func (r *reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc
}

// Verify that `channel` exists.
_, err := fetchObjectReference(ctx, r.dynamicClient, subscription.Namespace, &subscription.Spec.Channel)
if err != nil {
if _, err := resolve.ObjectReference(ctx, r.dynamicClient, subscription.Namespace, &subscription.Spec.Channel); err != nil {
logging.FromContext(ctx).Warn("Failed to validate Channel exists",
zap.Error(err),
zap.Any("channel", subscription.Spec.Channel))
r.recorder.Eventf(subscription, corev1.EventTypeWarning, channelReferenceFetchFailed, "Failed to validate spec.channel exists: %v", err)
return err
}

if subscriberURI, err := ResolveSubscriberSpec(ctx, r.client, r.dynamicClient, subscription.Namespace, subscription.Spec.Subscriber); err != nil {
if subscriberURI, err := resolve.SubscriberSpec(ctx, r.dynamicClient, subscription.Namespace, subscription.Spec.Subscriber); err != nil {
logging.FromContext(ctx).Warn("Failed to resolve Subscriber",
zap.Error(err),
zap.Any("subscriber", subscription.Spec.Subscriber))
Expand All @@ -189,8 +186,7 @@ func (r *reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc

// Ok, now that we have the Channel and at least one of the Call/Result, let's reconcile
// the Channel with this information.
err = r.syncPhysicalChannel(ctx, subscription, false)
if err != nil {
if err := r.syncPhysicalChannel(ctx, subscription, false); err != nil {
logging.FromContext(ctx).Warn("Failed to sync physical Channel", zap.Error(err))
r.recorder.Eventf(subscription, corev1.EventTypeWarning, physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err)
return err
Expand All @@ -201,10 +197,6 @@ func (r *reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc
return nil
}

func isNilOrEmptySubscriber(sub *v1alpha1.SubscriberSpec) bool {
return sub == nil || equality.Semantic.DeepEqual(sub, &v1alpha1.SubscriberSpec{})
}

func isNilOrEmptyReply(reply *v1alpha1.ReplyStrategy) bool {
return reply == nil || equality.Semantic.DeepEqual(reply, &v1alpha1.ReplyStrategy{})
}
Expand Down Expand Up @@ -248,67 +240,12 @@ func (r *reconciler) updateStatus(ctx context.Context, subscription *v1alpha1.Su
return latestSubscription, nil
}

// ResolveSubscriberSpec resolves the Spec.Call object. If it's an
// ObjectReference will resolve the object and treat it as an Addressable. If
// it's DNSName then it's used as is.
// TODO: Once Service Routes, etc. support Callable, use that.
//
func ResolveSubscriberSpec(ctx context.Context, client client.Client, dynamicClient dynamic.Interface, namespace string, s *v1alpha1.SubscriberSpec) (string, error) {
if isNilOrEmptySubscriber(s) {
return "", nil
}
if s.DNSName != nil && *s.DNSName != "" {
return *s.DNSName, nil
}

// K8s services are special cased. They can be called, even though they do not satisfy the
// Callable interface.
if s.Ref != nil && s.Ref.APIVersion == "v1" && s.Ref.Kind == "Service" {
svc := &corev1.Service{}
svcKey := types.NamespacedName{
Namespace: namespace,
Name: s.Ref.Name,
}
err := client.Get(ctx, svcKey, svc)
if err != nil {
logging.FromContext(ctx).Warn("Failed to fetch SubscriberSpec target as a K8s Service",
zap.Error(err),
zap.Any("subscriberSpec.Ref", s.Ref))
return "", err
}
return domainToURL(names.ServiceHostName(svc.Name, svc.Namespace)), nil
}

obj, err := fetchObjectReference(ctx, dynamicClient, namespace, s.Ref)
if err != nil {
logging.FromContext(ctx).Warn("Failed to fetch SubscriberSpec target",
zap.Error(err),
zap.Any("subscriberSpec.Ref", s.Ref))
return "", err
}
t := duckv1alpha1.AddressableType{}
if err := duck.FromUnstructured(obj, &t); err == nil {
if t.Status.Address != nil {
return domainToURL(t.Status.Address.Hostname), nil
}
}

legacy := duckv1alpha1.LegacyTarget{}
if err := duck.FromUnstructured(obj, &legacy); err == nil {
if legacy.Status.DomainInternal != "" {
return domainToURL(legacy.Status.DomainInternal), nil
}
}

return "", fmt.Errorf("status does not contain address")
}

// resolveResult resolves the Spec.Result object.
func (r *reconciler) resolveResult(ctx context.Context, namespace string, replyStrategy *v1alpha1.ReplyStrategy) (string, error) {
if isNilOrEmptyReply(replyStrategy) {
return "", nil
}
obj, err := fetchObjectReference(ctx, r.dynamicClient, namespace, replyStrategy.Channel)
obj, err := resolve.ObjectReference(ctx, r.dynamicClient, namespace, replyStrategy.Channel)
if err != nil {
logging.FromContext(ctx).Warn("Failed to fetch ReplyStrategy Channel",
zap.Error(err),
Expand All @@ -322,31 +259,11 @@ func (r *reconciler) resolveResult(ctx context.Context, namespace string, replyS
return "", err
}
if s.Status.Address != nil {
return domainToURL(s.Status.Address.Hostname), nil
return resolve.DomainToURL(s.Status.Address.Hostname), nil
}
return "", fmt.Errorf("status does not contain address")
}

// fetchObjectReference fetches an object based on ObjectReference.
func fetchObjectReference(ctx context.Context, dynamicClient dynamic.Interface, namespace string, ref *corev1.ObjectReference) (duck.Marshalable, error) {
resourceClient, err := createResourceInterface(dynamicClient, namespace, ref)
if err != nil {
logging.FromContext(ctx).Warn("Failed to create dynamic resource client", zap.Error(err))
return nil, err
}

return resourceClient.Get(ref.Name, metav1.GetOptions{})
}

func domainToURL(domain string) string {
u := url.URL{
Scheme: "http",
Host: domain,
Path: "/",
}
return u.String()
}

func (r *reconciler) syncPhysicalChannel(ctx context.Context, sub *v1alpha1.Subscription, isDeleted bool) error {
logging.FromContext(ctx).Debug("Reconciling physical from Channel", zap.Any("sub", sub))

Expand Down Expand Up @@ -428,7 +345,7 @@ func (r *reconciler) createSubscribable(subs []v1alpha1.Subscription) *eventingd

func (r *reconciler) patchPhysicalFrom(ctx context.Context, namespace string, physicalFrom corev1.ObjectReference, subs *eventingduck.Subscribable) error {
// First get the original object and convert it to only the bits we care about
s, err := fetchObjectReference(ctx, r.dynamicClient, namespace, &physicalFrom)
s, err := resolve.ObjectReference(ctx, r.dynamicClient, namespace, &physicalFrom)
if err != nil {
return err
}
Expand All @@ -452,7 +369,7 @@ func (r *reconciler) patchPhysicalFrom(ctx context.Context, namespace string, ph
return err
}

resourceClient, err := createResourceInterface(r.dynamicClient, namespace, &physicalFrom)
resourceClient, err := resolve.ResourceInterface(r.dynamicClient, namespace, &physicalFrom)
if err != nil {
logging.FromContext(ctx).Warn("Failed to create dynamic resource client", zap.Error(err))
return err
Expand All @@ -466,16 +383,6 @@ func (r *reconciler) patchPhysicalFrom(ctx context.Context, namespace string, ph
return nil
}

func createResourceInterface(dynamicClient dynamic.Interface, namespace string, ref *corev1.ObjectReference) (dynamic.ResourceInterface, error) {
rc := dynamicClient.Resource(duckapis.KindToResource(ref.GroupVersionKind()))

if rc == nil {
return nil, fmt.Errorf("failed to create dynamic client resource")
}
return rc.Namespace(namespace), nil

}

func addFinalizer(sub *v1alpha1.Subscription) {
finalizers := sets.NewString(sub.Finalizers...)
finalizers.Insert(finalizerName)
Expand Down
Loading