Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
99fddec
WIP
akashrv Apr 5, 2019
c08546a
Merge branch 'noistio2' of github.com:akashrv/eventing into noistio2
akashrv Apr 5, 2019
c642cea
WIP - In-memory working with E2E tests
akashrv Apr 6, 2019
2a4faae
WIP - remove istio dependency from in-memroy channel
akashrv Apr 9, 2019
bd7ae68
UTs pass, E2E tests pass with in-memory as well as kafka
akashrv Apr 10, 2019
df4487f
fixed uts that failed due to last K8s service change
akashrv Apr 10, 2019
824efe6
Merge branch 'master' of github.com:knative/eventing into noistiokafka
akashrv Apr 10, 2019
23ae8b4
Removed unnecessary space from a line
akashrv Apr 10, 2019
a04d6d9
Merge branch 'noistio2' of github.com:akashrv/eventing into noistio2
akashrv Apr 10, 2019
bb7ab3e
dding istio annotation to test POD. This will ve needed when running E2E
akashrv Apr 10, 2019
83e519d
Merge branch 'noistio2' of github.com:akashrv/eventing into noistiokafka
akashrv Apr 10, 2019
c646dcd
Bug fix to set clusterIp of K8s service only when it is not of type E…
akashrv Apr 11, 2019
485f6b3
WIP kafka channel
akashrv Apr 11, 2019
6fd3378
Merge branch 'noistio2' of github.com:akashrv/eventing into noistiokafka
akashrv Apr 11, 2019
37bae81
WIP kafka - UTs and E2E pass
akashrv Apr 12, 2019
feb5e64
Updated code based on PR comments
akashrv Apr 12, 2019
3414ba3
Merge branch 'noistio2' of github.com:akashrv/eventing into noistiokafka
akashrv Apr 12, 2019
c1b8581
WIP
akashrv Apr 15, 2019
d2c831f
Updates based on PR comments
akashrv Apr 15, 2019
d61cd1a
Merge branch 'master' of github.com:knative/eventing into noistio2
akashrv Apr 15, 2019
16a6ffc
Updates based on PR comments
akashrv Apr 15, 2019
67611dc
Fixed UTs
akashrv Apr 15, 2019
2cc8525
Updated VENDOR_LICENSE
akashrv Apr 15, 2019
5b68a3f
Merge branch 'noistio2' of github.com:akashrv/eventing into noistiokafka
akashrv Apr 15, 2019
3b3f16f
WIP. Update fanout sidecar
akashrv Apr 15, 2019
f1904fe
Merge branch 'master' of github.com:knative/eventing into noistiokafka
akashrv Apr 15, 2019
f065d22
Merge from upstream master
akashrv Apr 15, 2019
a645dce
UTs pass, ITs passed. COde ready for PR
akashrv Apr 16, 2019
09e4dfa
Updates based on PR comments
akashrv Apr 16, 2019
d1a1bd5
Changes based on PR comments
akashrv Apr 18, 2019
2d52ee9
Merge branch 'master' of github.com:knative/eventing into noistiokafka
akashrv Apr 18, 2019
d71fecf
Added back permission that was removed by mistake
akashrv Apr 18, 2019
6f5d4f8
Remove istio references
akashrv Apr 18, 2019
9f53403
Removed one more reference of istio
akashrv Apr 19, 2019
10a0045
Merge branch 'noistiokafka' of github.com:akashrv/eventing into noist…
akashrv Apr 19, 2019
f17056b
Merge branch 'master' of github.com:knative/eventing into noistiokafka
akashrv Apr 19, 2019
a2e3d81
Updates based on PR comments. Ready to merge into master
akashrv Apr 19, 2019
d71b410
Fixed a typo
akashrv Apr 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 1 addition & 64 deletions cmd/fanoutsidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,9 @@ import (

"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/channelwatcher"
"github.com/knative/eventing/pkg/logging"
"github.com/knative/eventing/pkg/sidecar/fanout"
"github.com/knative/eventing/pkg/sidecar/multichannelfanout"
"github.com/knative/eventing/pkg/sidecar/swappable"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"
Expand Down Expand Up @@ -137,48 +131,11 @@ func setupChannelWatcher(logger *zap.Logger, configUpdated swappable.UpdateConfi
logger.Error("Error while adding eventing scheme to manager.", zap.Error(err))
return nil, err
}
channelwatcher.New(mgr, logger, updateChannelConfig(configUpdated))
channelwatcher.New(mgr, logger, channelwatcher.UpdateConfigWatchHandler(configUpdated, shouldWatch))

return mgr, nil
}

func updateChannelConfig(updateConfig swappable.UpdateConfig) channelwatcher.WatchHandlerFunc {
return func(ctx context.Context, c client.Client, chanNamespacedName types.NamespacedName) error {
channels, err := listAllChannels(ctx, c)
if err != nil {
logging.FromContext(ctx).Info("Unable to list channels", zap.Error(err))
return err
}
config := multiChannelFanoutConfig(channels)
return updateConfig(config)
}
}

func listAllChannels(ctx context.Context, c client.Client) ([]v1alpha1.Channel, error) {
channels := make([]v1alpha1.Channel, 0)
for {
cl := &v1alpha1.ChannelList{}
opts := &client.ListOptions{
// Set Raw because if we need to get more than one page, then we will put the continue token
// into opts.Raw.Continue.
Raw: &metav1.ListOptions{},
}
if err := c.List(ctx, opts, cl); err != nil {
return nil, err
}
for _, c := range cl.Items {
if c.Status.IsReady() && shouldWatch(&c) {
channels = append(channels, c)
}
}
if cl.Continue != "" {
opts.Raw.Continue = cl.Continue
} else {
return channels, nil
}
}
}

func shouldWatch(ch *v1alpha1.Channel) bool {
if ch.Spec.Provisioner != nil && ch.Spec.Provisioner.Namespace == "" {
for _, v := range channelProvisioners {
Expand All @@ -190,26 +147,6 @@ func shouldWatch(ch *v1alpha1.Channel) bool {
return false
}

func multiChannelFanoutConfig(channels []v1alpha1.Channel) *multichannelfanout.Config {
cc := make([]multichannelfanout.ChannelConfig, 0)
for _, c := range channels {
channelConfig := multichannelfanout.ChannelConfig{
Namespace: c.Namespace,
Name: c.Name,
HostName: c.Status.Address.Hostname,
}
if c.Spec.Subscribable != nil {
channelConfig.FanoutConfig = fanout.Config{
Subscriptions: c.Spec.Subscribable.Subscribers,
}
}
cc = append(cc, channelConfig)
}
return &multichannelfanout.Config{
ChannelConfigs: cc,
}
}

// runnableServer is a small wrapper around http.Server so that it matches the manager.Runnable
// interface.
type runnableServer struct {
Expand Down
5 changes: 4 additions & 1 deletion contrib/gcppubsub/pkg/dispatcher/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ func main() {
// PubSub) and the dispatcher (takes messages in PubSub and sends them in cluster) in this
// binary.

_, runnables := receiver.New(logger.Desugar(), mgr.GetClient(), util.GcpPubSubClientCreator)
_, runnables, err := receiver.New(logger.Desugar(), mgr.GetClient(), util.GcpPubSubClientCreator)
if err != nil {
logger.Fatal("Unable to create new receiver and runnable", zap.Error(err))
}
for _, runnable := range runnables {
err = mgr.Add(runnable)
if err != nil {
Expand Down
10 changes: 7 additions & 3 deletions contrib/gcppubsub/pkg/dispatcher/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,22 @@ type Receiver struct {

// New creates a new Receiver and its associated MessageReceiver. The caller is responsible for
// Start()ing the returned MessageReceiver.
func New(logger *zap.Logger, client client.Client, pubSubClientCreator util.PubSubClientCreator) (*Receiver, []manager.Runnable) {
func New(logger *zap.Logger, client client.Client, pubSubClientCreator util.PubSubClientCreator) (*Receiver, []manager.Runnable, error) {
r := &Receiver{
logger: logger,
client: client,

pubSubClientCreator: pubSubClientCreator,
cache: cache.NewTTL(),
}
return r, []manager.Runnable{r.newMessageReceiver(), r.cache}
receiver, err := r.newMessageReceiver()
if err != nil {
return nil, nil, err
}
return r, []manager.Runnable{receiver, r.cache}, nil
}

func (r *Receiver) newMessageReceiver() *provisioners.MessageReceiver {
func (r *Receiver) newMessageReceiver() (*provisioners.MessageReceiver, error) {
return provisioners.NewMessageReceiver(r.sendEventToTopic, r.logger.Sugar())
}

Expand Down
11 changes: 9 additions & 2 deletions contrib/gcppubsub/pkg/dispatcher/receiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,21 @@ func TestReceiver(t *testing.T) {
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
mr, _ := New(
mr, _, err := New(
zap.NewNop(),
fake.NewFakeClient(tc.initialState...),
fakepubsub.Creator(tc.pubSubData))
if err != nil {
t.Fatalf("Error when creating a New receiver. Error:%s", err)
}
resp := httptest.NewRecorder()
req := httptest.NewRequest("POST", "/", strings.NewReader(validMessage))
req.Host = "test-channel.test-namespace.channels." + utils.GetClusterDomainName()
mr.newMessageReceiver().HandleRequest(resp, req)
receiver, err := mr.newMessageReceiver()
if err != nil {
t.Fatalf("Error when creating a new message receiver. Error:%s", err)
}
receiver.HandleRequest(resp, req)
if tc.expectedErr {
if resp.Result().StatusCode >= 200 && resp.Result().StatusCode < 300 {
t.Errorf("Expected an error. Actual: %v", resp.Result())
Expand Down
2 changes: 0 additions & 2 deletions contrib/kafka/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/knative/eventing/contrib/kafka/pkg/controller/channel"
eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client/config"
Expand Down Expand Up @@ -47,7 +46,6 @@ func _main() int {
// Add custom types to this array to get them into the manager's scheme.
schemeFuncs := []SchemeFunc{
eventingv1alpha.AddToScheme,
istiov1alpha3.AddToScheme,
}
for _, schemeFunc := range schemeFuncs {
schemeFunc(mgr.GetScheme())
Expand Down
38 changes: 13 additions & 25 deletions contrib/kafka/cmd/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,25 @@ package main

import (
"flag"
"fmt"
"log"
"os"

"github.com/knative/eventing/contrib/kafka/pkg/controller"
provisionerController "github.com/knative/eventing/contrib/kafka/pkg/controller"
"github.com/knative/eventing/contrib/kafka/pkg/dispatcher"
"github.com/knative/eventing/pkg/sidecar/configmap/watcher"
"github.com/knative/eventing/pkg/utils"
"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/channelwatcher"
"github.com/knative/pkg/signals"
"github.com/knative/pkg/system"
"go.uber.org/zap"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

func main() {
configMapName := os.Getenv("DISPATCHER_CONFIGMAP_NAME")
if configMapName == "" {
configMapName = provisionerController.DispatcherConfigMapName
}
configMapNamespace := os.Getenv("DISPATCHER_CONFIGMAP_NAMESPACE")
if configMapNamespace == "" {
configMapNamespace = system.Namespace()
}

flag.Parse()
logger, err := zap.NewProduction()
if err != nil {
log.Fatalf("unable to create logger: %v", err)
}

provisionerConfig, err := provisionerController.GetProvisionerConfig("/etc/config-provisioner")
if err != nil {
logger.Fatal("unable to load provisioner config", zap.Error(err))
Expand All @@ -68,17 +55,12 @@ func main() {
logger.Fatal("Unable to add kafkaDispatcher", zap.Error(err))
}

kc, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
logger.Fatal("unable to create kubernetes client.", zap.Error(err))
if err := v1alpha1.AddToScheme(mgr.GetScheme()); err != nil {
logger.Fatal("Unable to add scheme for eventing apis.", zap.Error(err))
}

cmw, err := watcher.NewWatcher(logger, kc, configMapNamespace, configMapName, kafkaDispatcher.UpdateConfig)
if err != nil {
logger.Fatal("unable to create configMap watcher", zap.String("configMap", fmt.Sprintf("%s/%s", configMapNamespace, configMapName)))
}
if err = mgr.Add(utils.NewBlockingStart(logger, cmw)); err != nil {
logger.Fatal("Unable to add the configMap watcher to the manager", zap.Error(err))
if err := channelwatcher.New(mgr, logger, channelwatcher.UpdateConfigWatchHandler(kafkaDispatcher.UpdateConfig, shouldWatch)); err != nil {
logger.Fatal("Unable to create channel watcher.", zap.Error(err))
}

// set up signals so we handle the first shutdown signal gracefully
Expand All @@ -89,3 +71,9 @@ func main() {
}
logger.Info("Exiting...")
}

func shouldWatch(ch *v1alpha1.Channel) bool {
return ch.Spec.Provisioner != nil &&
ch.Spec.Provisioner.Namespace == "" &&
ch.Spec.Provisioner.Name == controller.Name
}
32 changes: 11 additions & 21 deletions contrib/kafka/config/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,22 +68,12 @@ rules:
verbs:
- update
- apiGroups:
- networking.istio.io
- "" # Core API Group.
resources:
- virtualservices
- events
verbs:
- get
- list
- watch
- create
- update
- apiGroups:
- "" # Core API Group.
Comment thread
akashrv marked this conversation as resolved.
resources:
- events
verbs:
- create
- patch
- create
- patch
- update
---

Expand Down Expand Up @@ -170,6 +160,13 @@ rules:
- get
Comment thread
akashrv marked this conversation as resolved.
- list
- watch
- apiGroups:
Comment thread
akashrv marked this conversation as resolved.
- eventing.knative.dev
Comment thread
akashrv marked this conversation as resolved.
resources:
- channels
verbs:
- list
- watch

---

Expand Down Expand Up @@ -211,13 +208,6 @@ spec:
containers:
- name: dispatcher
image: github.com/knative/eventing/contrib/kafka/cmd/dispatcher
env:
- name: DISPATCHER_CONFIGMAP_NAME
value: kafka-channel-dispatcher
- name: DISPATCHER_CONFIGMAP_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
volumeMounts:
- name: kafka-channel-controller-config
mountPath: /etc/config-provisioner
Expand Down
2 changes: 0 additions & 2 deletions contrib/kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/knative/eventing/contrib/kafka/pkg/controller/channel"
eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client/config"
Expand Down Expand Up @@ -47,7 +46,6 @@ func main() {
// Add custom types to this array to get them into the manager's scheme.
schemeFuncs := []SchemeFunc{
eventingv1alpha.AddToScheme,
istiov1alpha3.AddToScheme,
}
for _, schemeFunc := range schemeFuncs {
schemeFunc(mgr.GetScheme())
Expand Down
24 changes: 7 additions & 17 deletions contrib/kafka/pkg/controller/channel/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package channel

import (
"github.com/Shopify/sarama"
istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
Comment thread
akashrv marked this conversation as resolved.
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -49,11 +48,10 @@ var (
)

type reconciler struct {
client client.Client
recorder record.EventRecorder
logger *zap.Logger
config *common.KafkaProvisionerConfig
configMapKey client.ObjectKey
client client.Client
recorder record.EventRecorder
logger *zap.Logger
config *common.KafkaProvisionerConfig
// Using a shared kafkaClusterAdmin does not work currently because of an issue with
// Shopify/sarama, see https://github.com/Shopify/sarama/issues/1162.
kafkaClusterAdmin sarama.ClusterAdmin
Expand All @@ -67,10 +65,9 @@ func ProvideController(mgr manager.Manager, config *common.KafkaProvisionerConfi
// Setup a new controller to Reconcile Channel.
c, err := controller.New(controllerAgentName, mgr, controller.Options{
Reconciler: &reconciler{
recorder: mgr.GetRecorder(controllerAgentName),
logger: logger,
config: config,
configMapKey: defaultConfigMapKey,
recorder: mgr.GetRecorder(controllerAgentName),
logger: logger,
config: config,
},
})
if err != nil {
Expand All @@ -89,13 +86,6 @@ func ProvideController(mgr manager.Manager, config *common.KafkaProvisionerConfi
return nil, err
}

// Watch the VirtualServices that are owned by Channels.
err = c.Watch(&source.Kind{Type: &istiov1alpha3.VirtualService{}}, &handler.EnqueueRequestForOwner{OwnerType: &eventingv1alpha1.Channel{}, IsController: true})
if err != nil {
logger.Error("unable to watch VirtualServices.", zap.Error(err))
return nil, err
}

return c, nil
}

Expand Down
Loading