Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/in_memory/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var (
readTimeout = 1 * time.Minute
writeTimeout = 1 * time.Minute
port = 8080
channelProvisioners = []string{"in-memory", "in-memory-channel"}
channelProvisioners = []string{"in-memory"}
)

func main() {
Expand Down
13 changes: 3 additions & 10 deletions config/provisioners/in-memory-channel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ They differ from most Channels in that they have:
### Deployment steps:

1. Setup [Knative Eventing](../../../DEVELOPMENT.md).
1. Apply the 'in-memory-channel' ClusterChannelProvisioner, Controller, and
1. Apply the 'in-memory' ClusterChannelProvisioner, Controller, and
Dispatcher.
```shell
ko apply -f config/provisioners/in-memory-channel/in-memory-channel.yaml
```
1. Create Channels that reference the 'in-memory-channel'.
1. Create Channels that reference the 'in-memory'.

```yaml
apiVersion: eventing.knative.dev/v1alpha1
Expand All @@ -34,7 +34,7 @@ They differ from most Channels in that they have:
provisioner:
apiVersion: eventing.knative.dev/v1alpha1
kind: ClusterChannelProvisioner
name: in-memory-channel
name: in-memory
```

### Components
Expand All @@ -59,10 +59,3 @@ Dispatcher for all in-memory Channels.
```shell
kubectl get deployment -n knative-eventing in-memory-channel-dispatcher
```

The Channel Dispatcher Config Map is used to send information about Channels and
Subscriptions from the Channel Controller to the Channel Dispatcher.

```shell
kubectl get configmap -n knative-eventing in-memory-channel-dispatcher-config-map
```
16 changes: 0 additions & 16 deletions config/provisioners/in-memory-channel/in-memory-channel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,6 @@ spec: {}

---

apiVersion: eventing.knative.dev/v1alpha1
kind: ClusterChannelProvisioner
metadata:
name: in-memory-channel
spec: {}

---

apiVersion: v1
kind: ServiceAccount
metadata:
Expand Down Expand Up @@ -69,14 +61,6 @@ rules:
- list
- watch
- create
- apiGroups:
- "" # Core API group.
resources:
- services
resourceNames:
- in-memory-channel-clusterbus
verbs:
- delete
- apiGroups:
- "" # Core API group.
resources:
Expand Down
2 changes: 1 addition & 1 deletion pkg/provisioners/inmemory/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const (
controllerAgentName = "in-memory-channel-controller"
)

// ProvideController returns a Controller that represents the in-memory-channel Provisioner.
// ProvideController returns a Controller that represents the in-memory Provisioner.
func ProvideController(mgr manager.Manager, logger *zap.Logger) (controller.Controller, error) {
// Setup a new controller to Reconcile Channels that belong to this Cluster Provisioner
// (in-memory channels).
Expand Down
10 changes: 0 additions & 10 deletions pkg/provisioners/inmemory/channel/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,6 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel)

c.Status.InitializeConditions()

if usesDeprecatedProvisioner(c) {
c.Status.MarkDeprecated("ClusterChannelProvisionerDeprecated", "The `in-memory-channel` ClusterChannelProvisioner is deprecated and will be removed in 0.7. Recommended replacement is `in-memory`.")
}

// We are syncing K8s Service to talk to this Channel.
svc, err := util.CreateK8sService(ctx, r.client, c, util.ExternalService(c))
if err != nil {
Expand All @@ -145,9 +141,3 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel)
c.Status.MarkProvisioned()
return nil
}

func usesDeprecatedProvisioner(c *eventingv1alpha1.Channel) bool {
return c.Spec.Provisioner != nil &&
c.Spec.Provisioner.Namespace == "" &&
c.Spec.Provisioner.Name == "in-memory-channel"
}
3 changes: 1 addition & 2 deletions pkg/provisioners/inmemory/channel/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ import (
)

const (
ccpName = "in-memory-channel"
asyncCCPName = "in-memory"
ccpName = "in-memory"

cNamespace = "test-namespace"
cName = "test-channel"
Expand Down
38 changes: 4 additions & 34 deletions pkg/provisioners/inmemory/clusterchannelprovisioner/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,18 @@ package clusterchannelprovisioner

import (
"context"
"fmt"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
util "github.com/knative/eventing/pkg/provisioners"
"github.com/knative/pkg/system"
)

const (
Expand All @@ -48,7 +45,7 @@ const (

var (
// provisionerNames contains the list of provisioners' names served by this controller
provisionerNames = []string{"in-memory-channel", "in-memory"}
provisionerNames = []string{"in-memory"}
)

type reconciler struct {
Expand All @@ -66,7 +63,7 @@ func (r *reconciler) InjectClient(c client.Client) error {
}

func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) {
//TODO use this to store the logger and set a deadline
// TODO use this to store the logger and set a deadline
ctx := context.TODO()
logger := r.logger.With(zap.Any("request", request))

Expand Down Expand Up @@ -167,42 +164,15 @@ func (r *reconciler) reconcile(ctx context.Context, ccp *eventingv1alpha1.Cluste
logger.Warn("ClusterChannelProvisioner's K8s Service is not owned by the ClusterChannelProvisioner", zap.Any("clusterChannelProvisioner", ccp), zap.Any("service", svc))
}

// The name of the svc has changed since version 0.2.1. Hence, delete old dispatcher service (in-memory-channel-clusterbus)
// that was created previously in version 0.2.0 to ensure backwards compatibility.
err = r.deleteOldDispatcherService(ctx, ccp)
if err != nil {
logger.Info("Error deleting the old ClusterChannelProvisioner's K8s Service", zap.Error(err))
r.recorder.Eventf(ccp, corev1.EventTypeWarning, k8sServiceDeleteFailed, "Failed to delete the old ClusterChannelProvisioner's K8s Service: %v", err)
return err
}

ccp.Status.MarkReady()
return nil
}

// Since there are two provisioners "in-memory" and "in-memory-channel" but one single dispatcher service deployment,
// update the label of the K8s service to always point at the same dispatcher service deployment
func setDispatcherServiceSelector() util.ServiceOption {
return func(svc *v1.Service) error {
// There used to be an "in-memory-channel" provisioner. It was removed in 0.7, but it was
// the original, so the dispatcher is still labeled with its name.
svc.Spec.Selector = util.DispatcherLabels("in-memory-channel")
return nil
}
}

func (r *reconciler) deleteOldDispatcherService(ctx context.Context, ccp *eventingv1alpha1.ClusterChannelProvisioner) error {
svcName := fmt.Sprintf("%s-clusterbus", ccp.Name)
svcKey := types.NamespacedName{
Namespace: system.Namespace(),
Name: svcName,
}
svc := &corev1.Service{}
err := r.client.Get(ctx, svcKey, svc)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}

return r.client.Delete(ctx, svc)
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ import (
)

const (
ccpUID = "test-uid"
testErrorMessage = "test-induced-error"
testNS = "test-ns"
inMemoryChannelName = "in-memory-channel"
inMemoryName = "in-memory"
ccpUID = "test-uid"
testErrorMessage = "test-induced-error"
testNS = "test-ns"
dispatcherPodLabel = "in-memory-channel"
inMemoryName = "in-memory"
)

var (
Expand All @@ -65,8 +65,8 @@ var (

func init() {
// Add types to scheme
eventingv1alpha1.AddToScheme(scheme.Scheme)
corev1.AddToScheme(scheme.Scheme)
_ = eventingv1alpha1.AddToScheme(scheme.Scheme)
_ = corev1.AddToScheme(scheme.Scheme)
}

func TestInjectClient(t *testing.T) {
Expand Down Expand Up @@ -198,23 +198,6 @@ func TestReconcile(t *testing.T) {
events[ccpReconciled],
},
},
{
Name: "Delete old dispatcher",
InitialState: []runtime.Object{
makeClusterChannelProvisioner(),
makeOldK8sService(),
},
WantPresent: []runtime.Object{
makeReadyClusterChannelProvisioner(),
makeK8sService(),
},
WantAbsent: []runtime.Object{
makeOldK8sService(),
},
WantEvent: []corev1.Event{
events[ccpReconciled],
},
},
{
Name: "Create dispatcher - not owned by CCP",
InitialState: []runtime.Object{
Expand All @@ -241,20 +224,6 @@ func TestReconcile(t *testing.T) {
events[ccpReconciled],
},
},
{
Name: "Create dispatcher succeeds - in-memory-Channel",
ReconcileKey: inMemoryChannelName,
InitialState: []runtime.Object{
makeClusterChannelProvisionerOld(),
},
WantPresent: []runtime.Object{
makeReadyClusterChannelProvisionerOld(),
makeK8sServiceOld(),
},
WantEvent: []corev1.Event{
events[ccpReconciled],
},
},
{
Name: "Create dispatcher succeeds - request is namespace-scoped",
InitialState: []runtime.Object{
Expand Down Expand Up @@ -319,12 +288,6 @@ func TestReconcile(t *testing.T) {
}
}

func makeClusterChannelProvisionerOld() *eventingv1alpha1.ClusterChannelProvisioner {
ccp := makeClusterChannelProvisioner()
ccp.SetName(inMemoryChannelName)
return ccp
}

func makeClusterChannelProvisioner() *eventingv1alpha1.ClusterChannelProvisioner {
return &eventingv1alpha1.ClusterChannelProvisioner{
TypeMeta: metav1.TypeMeta{
Expand All @@ -349,12 +312,6 @@ func makeReadyClusterChannelProvisioner() *eventingv1alpha1.ClusterChannelProvis
return ccp
}

func makeReadyClusterChannelProvisionerOld() *eventingv1alpha1.ClusterChannelProvisioner {
ccp := makeReadyClusterChannelProvisioner()
ccp.Name = inMemoryChannelName
return ccp
}

func makeDeletingClusterChannelProvisioner() *eventingv1alpha1.ClusterChannelProvisioner {
ccp := makeClusterChannelProvisioner()
ccp.DeletionTimestamp = &deletionTime
Expand Down Expand Up @@ -383,7 +340,7 @@ func makeK8sService() *corev1.Service {
Labels: util.DispatcherLabels(inMemoryName),
},
Spec: corev1.ServiceSpec{
Selector: util.DispatcherLabels(inMemoryChannelName),
Selector: util.DispatcherLabels(dispatcherPodLabel),
Ports: []corev1.ServicePort{
{
Port: 80,
Expand All @@ -395,20 +352,6 @@ func makeK8sService() *corev1.Service {
}
}

func makeK8sServiceOld() *corev1.Service {
svc := makeK8sService()
svc.SetName(fmt.Sprintf("%s-dispatcher", inMemoryChannelName))
svc.GetOwnerReferences()[0].Name = inMemoryChannelName
svc.SetLabels(util.DispatcherLabels(inMemoryChannelName))
return svc
}

func makeOldK8sService() *corev1.Service {
svc := makeK8sService()
svc.ObjectMeta.Name = fmt.Sprintf("%s-clusterbus", inMemoryName)
return svc
}

func makeK8sServiceNotOwnedByClusterChannelProvisioner() *corev1.Service {
svc := makeK8sService()
svc.OwnerReferences = nil
Expand Down
11 changes: 1 addition & 10 deletions pkg/provisioners/multichannelfanout/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,9 @@ func NewConfigFromChannels(channels []v1alpha1.Channel) *Config {
Name: c.Name,
HostName: c.Status.Address.Hostname,
}

asyncHandler := true
// This is fairly hacky, but this is expected to change from a generic fanout sidecar to the
// in-memory dispatcher. And `in-memory-channel` is to be deleted in 0.7, so this shouldn't
// be here for long.
if c.Spec.Provisioner != nil && c.Spec.Provisioner.Name == "in-memory-channel" {
asyncHandler = false
}

if c.Spec.Subscribable != nil {
channelConfig.FanoutConfig = fanout.Config{
AsyncHandler: asyncHandler,
AsyncHandler: true,
Subscriptions: c.Spec.Subscribable.Subscribers,
}
}
Expand Down
28 changes: 2 additions & 26 deletions pkg/provisioners/multichannelfanout/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,30 +110,6 @@ func TestNewConfigFromChannels(t *testing.T) {
},
},
},
}, {
name: "in-memory-channel provisioner -- synchronous",
channels: []v1alpha1.Channel{
withProvisioner(
makeChannel("chan-1", "ns-1", "a.b.c.d", makeSubscribable(makeSubscriber("sub1"))),
&v1.ObjectReference{
Name: "in-memory-channel",
}),
},
expected: &Config{
ChannelConfigs: []ChannelConfig{
{
Name: "chan-1",
Namespace: "ns-1",
HostName: "a.b.c.d",
FanoutConfig: fanout.Config{
AsyncHandler: false,
Subscriptions: []eventingduck.ChannelSubscriberSpec{
makeSubscriber("sub1"),
},
},
},
},
},
},
}

Expand Down Expand Up @@ -168,9 +144,9 @@ func withProvisioner(c v1alpha1.Channel, p *v1.ObjectReference) v1alpha1.Channel
return c
}

func makeSubscribable(subsriberSpec ...eventingduck.ChannelSubscriberSpec) *eventingduck.Subscribable {
func makeSubscribable(subscriberSpec ...eventingduck.ChannelSubscriberSpec) *eventingduck.Subscribable {
return &eventingduck.Subscribable{
Subscribers: subsriberSpec,
Subscribers: subscriberSpec,
}
}

Expand Down
Loading