Skip to content
Merged
45 changes: 39 additions & 6 deletions pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/eventing/pkg/apis/messaging"
"knative.dev/eventing/pkg/apis/messaging/v1beta1"
channelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1beta1/channel"
listers "knative.dev/eventing/pkg/client/listers/messaging/v1beta1"
Expand Down Expand Up @@ -90,13 +92,45 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, c *v1beta1.Channel) pkgr
}

c.Status.Channel = &backingChannelObjRef
c.Status.PropagateStatuses(&backingChannel.Status)
bCS := r.getChannelableStatus(ctx, &backingChannel.Status, backingChannel.Annotations)
c.Status.PropagateStatuses(bCS)

return newReconciledNormal(c.Namespace, c.Name)
}

func (r *Reconciler) getChannelableStatus(ctx context.Context, bc *duckv1alpha1.ChannelableCombinedStatus, cAnnotations map[string]string) *duckv1beta1.ChannelableStatus {

channelableStatus := &duckv1beta1.ChannelableStatus{}
if bc.AddressStatus.Address != nil {
channelableStatus.AddressStatus.Address = &duckv1.Addressable{}
bc.AddressStatus.Address.ConvertTo(ctx, channelableStatus.AddressStatus.Address)
}
channelableStatus.Status = bc.Status
if cAnnotations != nil {
if cAnnotations[messaging.SubscribableDuckVersionAnnotation] == "v1beta1" {
if len(bc.SubscribableStatus.Subscribers) > 0 {
channelableStatus.SubscribableStatus.Subscribers = bc.SubscribableStatus.Subscribers
}
} else { //v1alpha1
if bc.SubscribableTypeStatus.SubscribableStatus != nil &&
len(bc.SubscribableTypeStatus.SubscribableStatus.Subscribers) > 0 {
channelableStatus.SubscribableStatus.Subscribers = make([]duckv1beta1.SubscriberStatus, len(bc.SubscribableTypeStatus.SubscribableStatus.Subscribers))
for i, ss := range bc.SubscribableTypeStatus.SubscribableStatus.Subscribers {
channelableStatus.SubscribableStatus.Subscribers[i] = duckv1beta1.SubscriberStatus{
UID: ss.UID,
ObservedGeneration: ss.ObservedGeneration,
Ready: ss.Ready,
Message: ss.Message,
}
}
}
}
}
return channelableStatus
}

// reconcileBackingChannel reconciles Channel's 'c' underlying CRD channel.
func (r *Reconciler) reconcileBackingChannel(ctx context.Context, channelResourceInterface dynamic.ResourceInterface, c *v1beta1.Channel, backingChannelObjRef duckv1.KReference) (*duckv1beta1.Channelable, error) {
func (r *Reconciler) reconcileBackingChannel(ctx context.Context, channelResourceInterface dynamic.ResourceInterface, c *v1beta1.Channel, backingChannelObjRef duckv1.KReference) (*duckv1alpha1.ChannelableCombined, error) {
lister, err := r.channelableTracker.ListerForKReference(backingChannelObjRef)
Comment thread
lberk marked this conversation as resolved.
if err != nil {
logging.FromContext(ctx).Error("Error getting lister for Channel", zap.Any("backingChannel", backingChannelObjRef), zap.Error(err))
Expand All @@ -118,7 +152,7 @@ func (r *Reconciler) reconcileBackingChannel(ctx context.Context, channelResourc
return nil, err
}
logging.FromContext(ctx).Debug("Created backing Channel", zap.Any("backingChannel", newBackingChannel))
channelable := &duckv1beta1.Channelable{}
channelable := &duckv1alpha1.ChannelableCombined{}
err = duckapis.FromUnstructured(created, channelable)
if err != nil {
logging.FromContext(ctx).Error("Failed to convert to Channelable Object", zap.Any("backingChannel", backingChannelObjRef), zap.Any("createdChannel", created), zap.Error(err))
Expand All @@ -131,10 +165,9 @@ func (r *Reconciler) reconcileBackingChannel(ctx context.Context, channelResourc
return nil, err
}
logging.FromContext(ctx).Debug("Found backing Channel", zap.Any("backingChannel", backingChannelObjRef))
channelable, ok := backingChannel.(*duckv1beta1.Channelable)
channelable, ok := backingChannel.(*duckv1alpha1.ChannelableCombined)
Comment thread
lberk marked this conversation as resolved.
if !ok {
logging.FromContext(ctx).Error("Failed to convert to Channelable Object", zap.Any("backingChannel", backingChannel), zap.Error(err))
return nil, err
return nil, fmt.Errorf("Failed to convert to Channelable Object %+v", backingChannel)
}
return channelable, nil
}
7 changes: 4 additions & 3 deletions pkg/reconciler/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
clientgotesting "k8s.io/client-go/testing"
eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1beta1/channelable"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelablecombined"
channelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1beta1/channel"
"knative.dev/eventing/pkg/duck"
. "knative.dev/eventing/pkg/reconciler/testing"
Expand Down Expand Up @@ -226,6 +226,7 @@ func TestReconcile(t *testing.T) {
WithChannelAddressV1Beta1(backingChannelHostname)),
NewInMemoryChannelV1Beta1(channelName, testNS,
WithInitInMemoryChannelConditionsV1Beta1,
WithInMemoryChannelDuckAnnotationV1Beta1,
WithInMemoryChannelDeploymentReadyV1Beta1(),
WithInMemoryChannelServiceReadyV1Beta1(),
WithInMemoryChannelEndpointsReadyV1Beta1(),
Expand All @@ -250,11 +251,11 @@ func TestReconcile(t *testing.T) {

logger := logtesting.TestLogger(t)
table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler {
ctx = channelable.WithDuck(ctx)
ctx = channelablecombined.WithDuck(ctx)
r := &Reconciler{
dynamicClientSet: fakedynamicclient.Get(ctx),
channelLister: listers.GetV1Beta1MessagingChannelLister(),
channelableTracker: duck.NewListableTracker(ctx, channelable.Get, func(types.NamespacedName) {}, 0),
channelableTracker: duck.NewListableTracker(ctx, channelablecombined.Get, func(types.NamespacedName) {}, 0),
}
return channelreconciler.NewReconciler(ctx, logger,
fakeeventingclient.Get(ctx), listers.GetV1Beta1MessagingChannelLister(),
Expand Down
4 changes: 2 additions & 2 deletions pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/client/injection/ducks/duck/v1beta1/channelable"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelablecombined"
channelinformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/channel"
channelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1beta1/channel"
"knative.dev/eventing/pkg/duck"
Expand All @@ -44,7 +44,7 @@ func NewController(
}
impl := channelreconciler.NewImpl(ctx, r)

r.channelableTracker = duck.NewListableTracker(ctx, channelable.Get, impl.EnqueueKey, controller.GetTrackerLease(ctx))
r.channelableTracker = duck.NewListableTracker(ctx, channelablecombined.Get, impl.EnqueueKey, controller.GetTrackerLease(ctx))

logging.FromContext(ctx).Info("Setting up event handlers")

Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/channel/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
. "knative.dev/pkg/reconciler/testing"

// Fake injection informers
_ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1beta1/channelable/fake"
_ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1alpha1/channelablecombined/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1beta1/channel/fake"
_ "knative.dev/pkg/client/injection/kube/client/fake"
_ "knative.dev/pkg/injection/clients/dynamicclient/fake"
Expand Down
9 changes: 9 additions & 0 deletions pkg/reconciler/testing/inmemorychannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/eventing/pkg/apis/eventing"
"knative.dev/eventing/pkg/apis/messaging"
"knative.dev/eventing/pkg/apis/messaging/v1alpha1"
"knative.dev/eventing/pkg/apis/messaging/v1beta1"
"knative.dev/pkg/apis"
Expand Down Expand Up @@ -56,6 +57,14 @@ func WithInitInMemoryChannelConditionsV1Beta1(imc *v1beta1.InMemoryChannel) {
imc.Status.InitializeConditions()
}

func WithInMemoryChannelDuckAnnotationV1Beta1(imc *v1beta1.InMemoryChannel) {
annotations := map[string]string{
messaging.SubscribableDuckVersionAnnotation: "v1beta1",
}
imc.ObjectMeta.SetAnnotations(annotations)

}

func WithInMemoryChannelGenerationV1Beta1(gen int64) InMemoryChannelOptionV1Beta1 {
return func(s *v1beta1.InMemoryChannel) {
s.Generation = gen
Expand Down