Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,10 @@ rules:
- get
- list
- watch
# Updates the status to reflect subscribable status.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need something for finalizers, like:

- apiGroups:
- eventing.knative.dev
resources:
- channels/finalizers
- clusterchannelprovisioners/finalizers
verbs:
- update

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My changes should not require anything about that, just that the dispatcher needs to update /status. Or mayhaps I misunderstand what you meant.

- apiGroups:
- messaging.knative.dev
resources:
- inmemorychannels/status
verbs:
- update
4 changes: 4 additions & 0 deletions pkg/apis/messaging/v1alpha1/in_memory_channel_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
"github.com/knative/pkg/apis"
"github.com/knative/pkg/apis/duck/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)
Expand Down Expand Up @@ -68,6 +69,9 @@ func (imcs *InMemoryChannelStatus) InitializeConditions() {

// TODO: Use the new beta duck types.
func (imcs *InMemoryChannelStatus) SetAddress(url *apis.URL) {
if imcs.Address == nil {
imcs.Address = &v1alpha1.Addressable{}
}
if url != nil {
imcs.Address.Hostname = url.Host
imcs.Address.URL = url
Expand Down
15 changes: 9 additions & 6 deletions pkg/apis/messaging/v1alpha1/in_memory_channel_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,19 +340,22 @@ func TestInMemoryChannelStatus_SetAddressable(t *testing.T) {
},
},
},
AddressStatus: duckv1alpha1.AddressStatus{Address: &duckv1alpha1.Addressable{}},
},
},
"has domain": {
url: &apis.URL{Scheme: "http", Host: "test-domain"},
want: &InMemoryChannelStatus{
Address: duckv1alpha1.Addressable{
Addressable: duckv1beta1.Addressable{
URL: &apis.URL{
Scheme: "http",
Host: "test-domain",
AddressStatus: duckv1alpha1.AddressStatus{
Address: &duckv1alpha1.Addressable{
duckv1beta1.Addressable{
URL: &apis.URL{
Scheme: "http",
Host: "test-domain",
},
},
"test-domain",
},
Hostname: "test-domain",
},
Status: duckv1beta1.Status{
Conditions: []apis.Condition{{
Expand Down
5 changes: 4 additions & 1 deletion pkg/apis/messaging/v1alpha1/in_memory_channel_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ type InMemoryChannelStatus struct {
// provided targets from inside the cluster.
//
// It generally has the form {channel}.{namespace}.svc.{cluster domain name}
Address duckv1alpha1.Addressable `json:"address,omitempty"`
duckv1alpha1.AddressStatus `json:",inline"`

// Subscribers is populated with the statuses of each of the Channelable's subscribers.
eventingduck.SubscribableTypeStatus `json:",inline"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 76 additions & 1 deletion pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package controller

import (
"context"
"fmt"
"reflect"

"github.com/knative/eventing/pkg/inmemorychannel"

eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1"
"github.com/knative/eventing/pkg/apis/messaging/v1alpha1"
messaginginformers "github.com/knative/eventing/pkg/client/informers/externalversions/messaging/v1alpha1"
listers "github.com/knative/eventing/pkg/client/listers/messaging/v1alpha1"
Expand All @@ -29,6 +32,9 @@ import (
"github.com/knative/eventing/pkg/provisioners/multichannelfanout"
"github.com/knative/eventing/pkg/reconciler"
"github.com/knative/pkg/controller"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
)
Expand Down Expand Up @@ -81,12 +87,45 @@ func NewController(

func (r *Reconciler) Reconcile(ctx context.Context, key string) error {
// Convert the namespace/name string into a distinct namespace and name.
_, _, err := cache.SplitMetaNamespaceKey(key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
logging.FromContext(ctx).Error("invalid resource key")
return nil
}

// Get the IMC resource with this namespace/name.
original, err := r.inmemorychannelLister.InMemoryChannels(namespace).Get(name)
if apierrs.IsNotFound(err) {
// The resource may no longer exist, in which case we stop processing.
logging.FromContext(ctx).Error("InMemoryChannel key in work queue no longer exists")
return nil
} else if err != nil {
return err
}

if !original.Status.IsReady() {
return fmt.Errorf("Channel is not ready. Cannot configure and update subscriber status")
}

// Don't modify the informers copy.
channel := original.DeepCopy()

reconcileErr := r.reconcile(ctx, channel)
if reconcileErr != nil {
logging.FromContext(ctx).Error("Error reconciling InMemoryChannel", zap.Error(reconcileErr))
} else {
logging.FromContext(ctx).Debug("InMemoryChannel reconciled")
}

// todo: Should this check for subscribable status rather than entire status?
if _, updateStatusErr := r.updateStatus(ctx, channel); updateStatusErr != nil {
logging.FromContext(ctx).Error("Failed to update InMemoryChannel status", zap.Error(updateStatusErr))
return updateStatusErr
}
return nil
}

func (r *Reconciler) reconcile(ctx context.Context, imc *v1alpha1.InMemoryChannel) error {
// This is a special Reconciler that does the following:
// 1. Lists the inmemory channels.
// 2. Creates a multi-channel-fanout-config.
Expand All @@ -112,9 +151,28 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error {
return err
}

imc.Status.SubscribableTypeStatus.SubscribableStatus = r.createSubscribableStatus(imc.Spec.Subscribable)
return nil
}

func (r *Reconciler) createSubscribableStatus(subscribable *eventingduck.Subscribable) *eventingduck.SubscribableStatus {
if subscribable == nil {
return nil
}
subscriberStatus := make([]eventingduck.SubscriberStatus, 0)
for _, sub := range subscribable.Subscribers {
status := eventingduck.SubscriberStatus{
UID: sub.UID,
ObservedGeneration: sub.Generation,
Ready: corev1.ConditionTrue,
}
subscriberStatus = append(subscriberStatus, status)
}
return &eventingduck.SubscribableStatus{
Subscribers: subscriberStatus,
}
}

// newConfigFromInMemoryChannels creates a new Config from the list of inmemory channels.
func (r *Reconciler) newConfigFromInMemoryChannels(channels []*v1alpha1.InMemoryChannel) *multichannelfanout.Config {
cc := make([]multichannelfanout.ChannelConfig, 0)
Expand All @@ -136,3 +194,20 @@ func (r *Reconciler) newConfigFromInMemoryChannels(channels []*v1alpha1.InMemory
ChannelConfigs: cc,
}
}

func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.InMemoryChannel) (*v1alpha1.InMemoryChannel, error) {
imc, err := r.inmemorychannelLister.InMemoryChannels(desired.Namespace).Get(desired.Name)
if err != nil {
return nil, err
}

if reflect.DeepEqual(imc.Status, desired.Status) {
return imc, nil
}

// Don't modify the informers copy.
existing := imc.DeepCopy()
existing.Status = desired.Status

return r.EventingClientSet.MessagingV1alpha1().InMemoryChannels(desired.Namespace).UpdateStatus(existing)
}
49 changes: 49 additions & 0 deletions pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/knative/eventing/pkg/provisioners/multichannelfanout"
"k8s.io/apimachinery/pkg/runtime"

duckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1"
"github.com/knative/eventing/pkg/apis/messaging/v1alpha1"
fakeclientset "github.com/knative/eventing/pkg/client/clientset/versioned/fake"
informers "github.com/knative/eventing/pkg/client/informers/externalversions"
Expand All @@ -33,6 +34,7 @@ import (
. "github.com/knative/pkg/reconciler/testing"
fakekubeclientset "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
clientgotesting "k8s.io/client-go/testing"
)

const (
Expand Down Expand Up @@ -74,6 +76,28 @@ func TestNewController(t *testing.T) {
}

func TestAllCases(t *testing.T) {
subscribers := []duckv1alpha1.SubscriberSpec{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
Generation: 1,
SubscriberURI: "call1",
ReplyURI: "sink2",
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
Generation: 2,
SubscriberURI: "call2",
ReplyURI: "sink2",
}}

subscriberStatuses := []duckv1alpha1.SubscriberStatus{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 1,
Ready: "True",
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 2,
Ready: "True",
}}

imcKey := testNS + "/" + imcName
table := TableTest{
{
Expand All @@ -97,6 +121,31 @@ func TestAllCases(t *testing.T) {
reconciletesting.WithInMemoryChannelAddress(channelServiceAddress)),
},
WantErr: false,
}, {
Name: "with subscribers",
Key: imcKey,
Objects: []runtime.Object{
reconciletesting.NewInMemoryChannel(imcName, testNS,
reconciletesting.WithInitInMemoryChannelConditions,
reconciletesting.WithInMemoryChannelDeploymentReady(),
reconciletesting.WithInMemoryChannelServiceReady(),
reconciletesting.WithInMemoryChannelEndpointsReady(),
reconciletesting.WithInMemoryChannelChannelServiceReady(),
reconciletesting.WithInMemoryChannelSubscribers(subscribers),
reconciletesting.WithInMemoryChannelAddress(channelServiceAddress)),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: reconciletesting.NewInMemoryChannel(imcName, testNS,
reconciletesting.WithInitInMemoryChannelConditions,
reconciletesting.WithInMemoryChannelDeploymentReady(),
reconciletesting.WithInMemoryChannelServiceReady(),
reconciletesting.WithInMemoryChannelEndpointsReady(),
reconciletesting.WithInMemoryChannelChannelServiceReady(),
reconciletesting.WithInMemoryChannelSubscribers(subscribers),
reconciletesting.WithInMemoryChannelStatusSubscribers(subscriberStatuses),
reconciletesting.WithInMemoryChannelAddress(channelServiceAddress)),
}},
WantErr: false,
}, {},
}
defer logtesting.ClearAll()
Expand Down
17 changes: 16 additions & 1 deletion pkg/reconciler/testing/inmemorychannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (
"context"
"time"

duckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1"
"github.com/knative/eventing/pkg/apis/messaging/v1alpha1"
"github.com/knative/pkg/apis"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// "k8s.io/apimachinery/pkg/types"
)

// InMemoryChannelOption enables further configuration of a InMemoryChannel.
Expand Down Expand Up @@ -56,6 +56,12 @@ func WithInMemoryChannelDeleted(imc *v1alpha1.InMemoryChannel) {
imc.ObjectMeta.SetDeletionTimestamp(&deleteTime)
}

func WithInMemoryChannelSubscribers(subscribers []duckv1alpha1.SubscriberSpec) InMemoryChannelOption {
return func(imc *v1alpha1.InMemoryChannel) {
imc.Spec.Subscribable = &duckv1alpha1.Subscribable{Subscribers: subscribers}
}
}

func WithInMemoryChannelDeploymentNotReady(reason, message string) InMemoryChannelOption {
return func(imc *v1alpha1.InMemoryChannel) {
imc.Status.MarkDispatcherFailed(reason, message)
Expand Down Expand Up @@ -112,3 +118,12 @@ func WithInMemoryChannelAddress(a string) InMemoryChannelOption {
})
}
}

func WithInMemoryChannelStatusSubscribers(subscriberStatuses []duckv1alpha1.SubscriberStatus) InMemoryChannelOption {
return func(imc *v1alpha1.InMemoryChannel) {
imc.Status.SubscribableTypeStatus = duckv1alpha1.SubscribableTypeStatus{
SubscribableStatus: &duckv1alpha1.SubscribableStatus{
Subscribers: subscriberStatuses},
}
}
}