Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions config/provisioners/kafka/kafka-provisioner.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,24 @@ rules:
- list
- watch
- update
- apiGroups:
- "" # Core API group.
resources:
- services
verbs:
- get
- list
- watch
- create
- apiGroups:
- networking.istio.io
resources:
- virtualservices
verbs:
- get
- list
- watch
- create
---

apiVersion: rbac.authorization.k8s.io/v1beta1
Expand Down
20 changes: 18 additions & 2 deletions pkg/provisioners/kafka/controller/channel/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package channel

import (
"github.com/Shopify/sarama"
istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -27,7 +29,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
common "github.com/knative/eventing/pkg/provisioners/kafka/controller"
)

Expand Down Expand Up @@ -65,7 +67,21 @@ func ProvideController(mgr manager.Manager, config *common.KafkaProvisionerConfi
}

// Watch Channel events and enqueue Channel object key.
if err := c.Watch(&source.Kind{Type: &v1alpha1.Channel{}}, &handler.EnqueueRequestForObject{}); err != nil {
if err := c.Watch(&source.Kind{Type: &eventingv1alpha1.Channel{}}, &handler.EnqueueRequestForObject{}); err != nil {
return nil, err
}

// Watch the K8s Services that are owned by Channels.
err = c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventingv1alpha1.Channel{}, IsController: true})
if err != nil {
logger.Error("unable to watch K8s Services.", zap.Error(err))
return nil, err
}

// Watch the VirtualServices that are owned by Channels.
err = c.Watch(&source.Kind{Type: &istiov1alpha3.VirtualService{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventingv1alpha1.Channel{}, IsController: true})
if err != nil {
logger.Error("unable to watch VirtualServices.", zap.Error(err))
return nil, err
}

Expand Down
83 changes: 38 additions & 45 deletions pkg/provisioners/kafka/controller/channel/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@ import (
"fmt"

"github.com/Shopify/sarama"
"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/util/sets"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
eventingController "github.com/knative/eventing/pkg/controller"
util "github.com/knative/eventing/pkg/provisioners"
"github.com/knative/eventing/pkg/provisioners/kafka/controller"
)

Expand All @@ -41,7 +42,7 @@ const (
)

type channelArgs struct {
NumPartitions int32 `json:"NumPartitions,omitempty"`
NumPartitions int32
}

// Reconcile compares the actual state with the desired, and attempts to
Expand Down Expand Up @@ -88,13 +89,13 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
if clusterChannelProvisioner.Status.IsReady() {
// Reconcile this copy of the Channel and then write back any status
// updates regardless of whether the reconcile error out.
err = r.reconcile(newChannel)
err = r.reconcile(ctx, newChannel)
} else {
newChannel.Status.MarkNotProvisioned("NotProvisioned", "ClusterChannelProvisioner %s is not ready", clusterChannelProvisioner.Name)
err = fmt.Errorf("ClusterChannelProvisioner %s is not ready", clusterChannelProvisioner.Name)
}

if updateChannelErr := r.updateChannel(ctx, newChannel); updateChannelErr != nil {
if updateChannelErr := util.UpdateChannel(ctx, r.client, newChannel); updateChannelErr != nil {
r.logger.Info("failed to update channel status", zap.Error(updateChannelErr))
return reconcile.Result{}, updateChannelErr
}
Expand All @@ -103,7 +104,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
return reconcile.Result{}, err
}

func (r *reconciler) reconcile(channel *v1alpha1.Channel) error {
func (r *reconciler) reconcile(ctx context.Context, channel *v1alpha1.Channel) error {

// We don't currently initialize r.kafkaClusterAdmin, hence we end up creating the cluster admin client every time.
// This is because of an issue with Shopify/sarama. See https://github.com/Shopify/sarama/issues/1162.
Expand Down Expand Up @@ -131,16 +132,44 @@ func (r *reconciler) reconcile(channel *v1alpha1.Channel) error {
if err := r.deprovisionChannel(channel, kafkaClusterAdmin); err != nil {
return err
}
r.removeFinalizer(channel)
util.RemoveFinalizer(channel, finalizerName)
return nil
}

r.addFinalizer(channel)
util.AddFinalizer(channel, finalizerName)

if err := r.provisionChannel(channel, kafkaClusterAdmin); err != nil {
channel.Status.MarkNotProvisioned("NotProvisioned", "error while provisioning: %s", err)
return err
}

svc, err := util.CreateK8sService(ctx, r.client, channel)

if err != nil {
r.logger.Info("error creating the Channel's K8s Service", zap.Error(err))
return err
}

// Check if this Channel is the owner of the K8s service.
if !metav1.IsControlledBy(svc, channel) {
r.logger.Warn("Channel's K8s Service is not owned by the Channel", zap.Any("channel", channel), zap.Any("service", svc))
}

channel.Status.SetAddress(eventingController.ServiceHostName(svc.Name, svc.Namespace))

virtualService, err := util.CreateVirtualService(ctx, r.client, channel)

if err != nil {
r.logger.Info("error creating the Virtual Service for the Channel", zap.Error(err))
return err
}

// If the Virtual Service is not controlled by this Channel, we should log a warning, but don't
// consider it an error.
if !metav1.IsControlledBy(virtualService, channel) {
r.logger.Warn("VirtualService not owned by Channel", zap.Any("channel", channel), zap.Any("virtualService", virtualService))
}

channel.Status.MarkProvisioned()

// close the connection
Expand Down Expand Up @@ -207,42 +236,6 @@ func (r *reconciler) getClusterChannelProvisioner() (*v1alpha1.ClusterChannelPro
return clusterChannelProvisioner, nil
}

func (r *reconciler) updateChannel(ctx context.Context, u *v1alpha1.Channel) error {
channel := &v1alpha1.Channel{}
err := r.client.Get(ctx, client.ObjectKey{Namespace: u.Namespace, Name: u.Name}, channel)
if err != nil {
return err
}

updated := false
if !equality.Semantic.DeepEqual(channel.Finalizers, u.Finalizers) {
channel.SetFinalizers(u.ObjectMeta.Finalizers)
updated = true
}

if !equality.Semantic.DeepEqual(channel.Status, u.Status) {
channel.Status = u.Status
updated = true
}

if updated == false {
return nil
}
return r.client.Update(ctx, channel)
}

func (r *reconciler) addFinalizer(channel *v1alpha1.Channel) {
finalizers := sets.NewString(channel.Finalizers...)
finalizers.Insert(finalizerName)
channel.Finalizers = finalizers.List()
}

func (r *reconciler) removeFinalizer(channel *v1alpha1.Channel) {
finalizers := sets.NewString(channel.Finalizers...)
finalizers.Delete(finalizerName)
channel.Finalizers = finalizers.List()
}

func createKafkaAdminClient(config *controller.KafkaProvisionerConfig) (sarama.ClusterAdmin, error) {
saramaConf := sarama.NewConfig()
saramaConf.Version = sarama.V1_1_0_0
Expand Down
54 changes: 54 additions & 0 deletions pkg/provisioners/kafka/controller/channel/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/Shopify/sarama"
"github.com/google/go-cmp/cmp"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -36,23 +37,28 @@ import (
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
controllertesting "github.com/knative/eventing/pkg/controller/testing"
"github.com/knative/eventing/pkg/provisioners"
util "github.com/knative/eventing/pkg/provisioners"
"github.com/knative/eventing/pkg/provisioners/kafka/controller"
)

const (
channelName = "test-channel"
clusterChannelProvisionerName = "kafka-channel"
testNS = "test-namespace"
testUID = "test-uid"
argumentNumPartitions = "NumPartitions"
)

var (
truePointer = true

deletedTs = metav1.Now().Rfc3339Copy()
)

func init() {
// Add types to scheme
eventingv1alpha1.AddToScheme(scheme.Scheme)
istiov1alpha3.AddToScheme(scheme.Scheme)
}

var mockFetchError = controllertesting.Mocks{
Expand Down Expand Up @@ -124,6 +130,7 @@ var testCases = []controllertesting.TestCase{
InitialState: []runtime.Object{
getNewClusterChannelProvisioner(clusterChannelProvisionerName, true),
getNewChannel(channelName, clusterChannelProvisionerName),
makeVirtualService(),
},
ReconcileKey: fmt.Sprintf("%s/%s", testNS, channelName),
WantResult: reconcile.Result{},
Expand Down Expand Up @@ -425,6 +432,7 @@ func getNewChannelWithArgs(name string, args map[string]interface{}) *eventingv1
func getNewChannelProvisionedStatus(name, provisioner string) *eventingv1alpha1.Channel {
c := getNewChannel(name, provisioner)
c.Status.InitializeConditions()
c.Status.SetAddress(fmt.Sprintf("%s-channel.%s.svc.cluster.local", c.Name, c.Namespace))
c.Status.MarkProvisioned()
c.Finalizers = []string{finalizerName}
return c
Expand Down Expand Up @@ -478,6 +486,52 @@ func getNewClusterChannelProvisioner(name string, isReady bool) *eventingv1alpha
return clusterChannelProvisioner
}

func makeVirtualService() *istiov1alpha3.VirtualService {
return &istiov1alpha3.VirtualService{
TypeMeta: metav1.TypeMeta{
APIVersion: istiov1alpha3.SchemeGroupVersion.String(),
Kind: "VirtualService",
},
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-channel", testNS),
Namespace: testNS,
Labels: map[string]string{
"channel": channelName,
"provisioner": clusterChannelProvisionerName,
},
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: eventingv1alpha1.SchemeGroupVersion.String(),
Kind: "Channel",
Name: channelName,
UID: testUID,
Controller: &truePointer,
BlockOwnerDeletion: &truePointer,
},
},
},
Spec: istiov1alpha3.VirtualServiceSpec{
Hosts: []string{
fmt.Sprintf("%s-channel.%s.svc.cluster.local", channelName, testNS),
fmt.Sprintf("%s.%s.channels.cluster.local", channelName, testNS),
},
Http: []istiov1alpha3.HTTPRoute{{
Rewrite: &istiov1alpha3.HTTPRewrite{
Authority: fmt.Sprintf("%s.%s.channels.cluster.local", channelName, testNS),
},
Route: []istiov1alpha3.DestinationWeight{{
Destination: istiov1alpha3.Destination{
Host: "kafka-provisioner.knative-eventing.svc.cluster.local",
Port: istiov1alpha3.PortSelector{
Number: util.PortNumber,
},
}},
}},
},
},
}
}

func om(namespace, name string) metav1.ObjectMeta {
return metav1.ObjectMeta{
Namespace: namespace,
Expand Down
12 changes: 11 additions & 1 deletion pkg/provisioners/kafka/controller/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ limitations under the License.
package controller

import (
"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
)

const (
Expand Down Expand Up @@ -63,6 +66,13 @@ func ProvideController(mgr manager.Manager, config *KafkaProvisionerConfig, logg
return nil, err
}

// Watch the K8s Services that are owned by ClusterChannelProvisioners.
err = c.Watch(&source.Kind{Type: &corev1.Service{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventingv1alpha1.ClusterChannelProvisioner{}, IsController: true})
if err != nil {
logger.Error("unable to watch K8s Services.", zap.Error(err))
return nil, err
}

return c, nil
}

Expand Down
Loading