Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions pkg/reconciler/v1alpha1/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,14 @@ func (r *reconciler) reconcile(subscription *v1alpha1.Subscription) error {
}

// Verify that `channel` exists.
_, err := r.fetchObjectReference(subscription.Namespace, &subscription.Spec.Channel)
_, err := fetchObjectReference(r.dynamicClient, subscription.Namespace, &subscription.Spec.Channel)
if err != nil {
glog.Warningf("Failed to validate `channel` exists: %+v, %v", subscription.Spec.Channel, err)
r.recorder.Eventf(subscription, corev1.EventTypeWarning, channelReferenceFetchFailed, "Failed to validate spec.channel exists: %v", err)
return err
}

if subscriberURI, err := r.resolveSubscriberSpec(subscription.Namespace, subscription.Spec.Subscriber); err != nil {
if subscriberURI, err := ResolveSubscriberSpec(context.TODO(), r.client, r.dynamicClient, subscription.Namespace, subscription.Spec.Subscriber); err != nil {
glog.Warningf("Failed to resolve Subscriber %+v : %s", *subscription.Spec.Subscriber, err)
r.recorder.Eventf(subscription, corev1.EventTypeWarning, subscriberResolveFailed, "Failed to resolve spec.subscriber: %v", err)
return err
Expand Down Expand Up @@ -240,12 +240,12 @@ func (r *reconciler) updateStatus(subscription *v1alpha1.Subscription) (*v1alpha
return latestSubscription, nil
}

// resolveSubscriberSpec resolves the Spec.Call object. If it's an
// ObjectReference will resolve the object and treat it as a Callable. If
// ResolveSubscriberSpec resolves the Spec.Call object. If it's an
// ObjectReference will resolve the object and treat it as an Addressable. If
// it's DNSName then it's used as is.
// TODO: Once Service Routes, etc. support Callable, use that.
//
func (r *reconciler) resolveSubscriberSpec(namespace string, s *v1alpha1.SubscriberSpec) (string, error) {
func ResolveSubscriberSpec(ctx context.Context, client client.Client, dynamicClient dynamic.Interface, namespace string, s *v1alpha1.SubscriberSpec) (string, error) {
if isNilOrEmptySubscriber(s) {
return "", nil
}
Expand All @@ -261,15 +261,15 @@ func (r *reconciler) resolveSubscriberSpec(namespace string, s *v1alpha1.Subscri
Namespace: namespace,
Name: s.Ref.Name,
}
err := r.client.Get(context.TODO(), svcKey, svc)
err := client.Get(ctx, svcKey, svc)
if err != nil {
glog.Warningf("Failed to fetch SubscriberSpec target as a K8s Service %+v: %s", s.Ref, err)
return "", err
}
return domainToURL(names.ServiceHostName(svc.Name, svc.Namespace)), nil
}

obj, err := r.fetchObjectReference(namespace, s.Ref)
obj, err := fetchObjectReference(dynamicClient, namespace, s.Ref)
if err != nil {
glog.Warningf("Failed to fetch SubscriberSpec target %+v: %s", s.Ref, err)
return "", err
Expand All @@ -296,7 +296,7 @@ func (r *reconciler) resolveResult(namespace string, replyStrategy *v1alpha1.Rep
if isNilOrEmptyReply(replyStrategy) {
return "", nil
}
obj, err := r.fetchObjectReference(namespace, replyStrategy.Channel)
obj, err := fetchObjectReference(r.dynamicClient, namespace, replyStrategy.Channel)
if err != nil {
glog.Warningf("Failed to fetch ReplyStrategy channel %+v: %s", replyStrategy, err)
return "", err
Expand All @@ -314,8 +314,8 @@ func (r *reconciler) resolveResult(namespace string, replyStrategy *v1alpha1.Rep
}

// fetchObjectReference fetches an object based on ObjectReference.
func (r *reconciler) fetchObjectReference(namespace string, ref *corev1.ObjectReference) (duck.Marshalable, error) {
resourceClient, err := r.CreateResourceInterface(namespace, ref)
func fetchObjectReference(dynamicClient dynamic.Interface, namespace string, ref *corev1.ObjectReference) (duck.Marshalable, error) {
resourceClient, err := createResourceInterface(dynamicClient, namespace, ref)
if err != nil {
glog.Warningf("failed to create dynamic client resource: %v", err)
return nil, err
Expand Down Expand Up @@ -415,7 +415,7 @@ func (r *reconciler) createSubscribable(subs []v1alpha1.Subscription) *eventingd

func (r *reconciler) patchPhysicalFrom(namespace string, physicalFrom corev1.ObjectReference, subs *eventingduck.Subscribable) error {
// First get the original object and convert it to only the bits we care about
s, err := r.fetchObjectReference(namespace, &physicalFrom)
s, err := fetchObjectReference(r.dynamicClient, namespace, &physicalFrom)
if err != nil {
return err
}
Expand All @@ -439,7 +439,7 @@ func (r *reconciler) patchPhysicalFrom(namespace string, physicalFrom corev1.Obj
return err
}

resourceClient, err := r.CreateResourceInterface(namespace, &physicalFrom)
resourceClient, err := createResourceInterface(r.dynamicClient, namespace, &physicalFrom)
if err != nil {
glog.Warningf("failed to create dynamic client resource: %v", err)
return err
Expand All @@ -454,8 +454,8 @@ func (r *reconciler) patchPhysicalFrom(namespace string, physicalFrom corev1.Obj
return nil
}

func (r *reconciler) CreateResourceInterface(namespace string, ref *corev1.ObjectReference) (dynamic.ResourceInterface, error) {
rc := r.dynamicClient.Resource(duckapis.KindToResource(ref.GroupVersionKind()))
func createResourceInterface(dynamicClient dynamic.Interface, namespace string, ref *corev1.ObjectReference) (dynamic.ResourceInterface, error) {
rc := dynamicClient.Resource(duckapis.KindToResource(ref.GroupVersionKind()))

if rc == nil {
return nil, fmt.Errorf("failed to create dynamic client resource")
Expand Down