Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
42e8a3a
Commit to pull from master
akashrv May 30, 2019
e80cc8d
Merge branch 'master' of github.com:knative/eventing into 1189
akashrv May 30, 2019
b697348
WIP, before pulling channelable changes
akashrv May 30, 2019
dd1f22c
(WIP) Merge branch 'master' of github.com:knative/eventing into 1189
akashrv May 30, 2019
30f6f60
WIP - merged subscribable and addressable types
akashrv May 31, 2019
7070dce
WIP before pulling changes from main
akashrv May 31, 2019
3044483
Merge branch 'master' of github.com:knative/eventing into 1189
akashrv May 31, 2019
625cf2c
WIP before merging upstrea mmaster
akashrv Jun 3, 2019
f6db117
Merge branch 'master' of github.com:knative/eventing into 1189
akashrv Jun 3, 2019
4396d13
WIP - before merging kafka SetAddress changes from upstream master
akashrv Jun 3, 2019
2b76447
Merge branch 'master' of github.com:knative/eventing into 1189
akashrv Jun 3, 2019
ffd53aa
WIP - Manual E2E test works for kafka channel.
akashrv Jun 4, 2019
f776064
ready to send WIP PR
akashrv Jun 4, 2019
f707260
Ville's comments on PR
akashrv Jun 4, 2019
8c06975
Merge branch 'master' of github.com:knative/eventing into 1189
akashrv Jun 4, 2019
13fa609
Fixed subscription uts
akashrv Jun 4, 2019
2a5455b
Fixed UTs
akashrv Jun 5, 2019
cd2a98d
Updated comments in code
akashrv Jun 5, 2019
8cc3323
Merge branch 'master' of github.com:knative/eventing into 1189
akashrv Jun 5, 2019
03b1b3f
Updated kafka dispatcher UpdateConfig to remove code redundancy
akashrv Jun 5, 2019
2f7350f
Changes based on PR comments from harwayne
akashrv Jun 5, 2019
75b42a6
Merge branch 'master' of github.com:knative/eventing into 1189
akashrv Jun 5, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/300-channel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ spec:
uid:
type: string
minLength: 1
generation:
type: integer
uid:
type: string
minLength: 1
Expand Down
2 changes: 1 addition & 1 deletion contrib/gcppubsub/pkg/controller/channel/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ func TestReconcile(t *testing.T) {
WantPresent: []runtime.Object{
makeChannelWithFinalizerAndSubscriberWithoutUID(),
},
WantErrMsg: "empty reference UID: {nil http://foo/ }",
WantErrMsg: "empty reference UID: {nil 0 http://foo/ }",
WantEvent: []corev1.Event{
events[gcpResourcesPlanFailed],
},
Expand Down
4 changes: 3 additions & 1 deletion contrib/kafka/cmd/channel_controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"flag"
"log"

// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

clientset "github.com/knative/eventing/contrib/kafka/pkg/client/clientset/versioned"
eventingScheme "github.com/knative/eventing/contrib/kafka/pkg/client/clientset/versioned/scheme"
informers "github.com/knative/eventing/contrib/kafka/pkg/client/informers/externalversions"
Expand Down Expand Up @@ -63,7 +66,6 @@ func main() {
logger.Fatalw("Error building kubeconfig", zap.Error(err))
}

// TODO the underlying config map needs to be watched and the config should be reloaded if there is a change.
Comment thread
akashrv marked this conversation as resolved.
kafkaConfig, err := utils.GetKafkaConfig("/etc/config-kafka")
if err != nil {
logger.Fatalw("Error loading kafka config", zap.Error(err))
Expand Down
3 changes: 3 additions & 0 deletions contrib/kafka/cmd/channel_dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"flag"
"log"

// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

clientset "github.com/knative/eventing/contrib/kafka/pkg/client/clientset/versioned"
eventingScheme "github.com/knative/eventing/contrib/kafka/pkg/client/clientset/versioned/scheme"
informers "github.com/knative/eventing/contrib/kafka/pkg/client/informers/externalversions"
Expand Down
8 changes: 7 additions & 1 deletion contrib/kafka/config/200-dispatcher-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@ rules:
- messaging.knative.dev
resources:
- kafkachannels
- kafkachannels/status
verbs:
- get
- list
- watch
- apiGroups:
- messaging.knative.dev
resources:
- kafkachannels/status
verbs:
- get
- update
- apiGroups:
- "" # Core API group.
resources:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
"github.com/knative/pkg/apis"
"github.com/knative/pkg/apis/duck/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)
Expand Down Expand Up @@ -75,7 +76,11 @@ func (cs *KafkaChannelStatus) InitializeConditions() {
kc.Manage(cs).InitializeConditions()
}

// SetAddress sets the address (as part of Addressable contract) and marks the correct condition.
func (cs *KafkaChannelStatus) SetAddress(url *apis.URL) {
if cs.Address == nil {
cs.Address = &v1alpha1.Addressable{}
}
if url != nil {
cs.Address.Hostname = url.Host
cs.Address.URL = url
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,20 +376,22 @@ func TestKafkaChannelStatus_SetAddressable(t *testing.T) {
},
},
},
AddressStatus: duckv1alpha1.AddressStatus{Address: &duckv1alpha1.Addressable{}},
},
},
"has domain": {
url: &apis.URL{Scheme: "http", Host: "test-domain"},
want: &KafkaChannelStatus{
Address: duckv1alpha1.Addressable{
Addressable: duckv1beta1.Addressable{
URL: &apis.URL{
Scheme: "http",
Host: "test-domain",
AddressStatus: duckv1alpha1.AddressStatus{
Address: &duckv1alpha1.Addressable{
Addressable: duckv1beta1.Addressable{
URL: &apis.URL{
Scheme: "http",
Host: "test-domain",
},
},
},
Hostname: "test-domain",
},
Hostname: "test-domain",
}},
Status: duckv1beta1.Status{
Conditions: []apis.Condition{
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,10 @@ type KafkaChannelStatus struct {
// provided targets from inside the cluster.
//
// It generally has the form {channel}.{namespace}.svc.{cluster domain name}
Address duckv1alpha1.Addressable `json:"address,omitempty"`
duckv1alpha1.AddressStatus `json:",inline"`

// Subscribers is populated with the statuses of each of the Channelable's subscribers.
eventingduck.SubscribableTypeStatus `json:",inline"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

106 changes: 63 additions & 43 deletions contrib/kafka/pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ type KafkaDispatcher struct {
// TODO: config doesn't have to be atomic as it is read and updated using updateLock.
config atomic.Value
hostToChannelMap atomic.Value
updateLock sync.Mutex
// hostToChannelMapLock is used to update hostToChannelMap
hostToChannelMapLock sync.Mutex
Comment thread
akashrv marked this conversation as resolved.

receiver *provisioners.MessageReceiver
dispatcher *provisioners.MessageDispatcher

kafkaAsyncProducer sarama.AsyncProducer
kafkaConsumers map[provisioners.ChannelReference]map[subscription]KafkaConsumer
// consumerUpdateLock must be used to update kafkaConsumers
consumerUpdateLock sync.Mutex
kafkaCluster KafkaCluster

topicFunc TopicFunc
Expand Down Expand Up @@ -104,61 +107,78 @@ func (d *KafkaDispatcher) configDiff(updated *multichannelfanout.Config) string
return cmp.Diff(d.getConfig(), updated)
}

func (d *KafkaDispatcher) UpdateConfig(config *multichannelfanout.Config) error {
// UpdateKafkaConsumers will be called by new CRD based kafka channel dispatcher controller, instead of UpdateConfig.
func (d *KafkaDispatcher) UpdateKafkaConsumers(config *multichannelfanout.Config) (map[eventingduck.SubscriberSpec]error, error) {
Comment thread
akashrv marked this conversation as resolved.
if config == nil {
return errors.New("nil config")
return nil, fmt.Errorf("nil config")
}

d.updateLock.Lock()
defer d.updateLock.Unlock()
d.consumerUpdateLock.Lock()
defer d.consumerUpdateLock.Unlock()

if diff := d.configDiff(config); diff != "" {
d.logger.Info("Updating config (-old +new)", zap.String("diff", diff))

// Create hostToChannelMap before updating kafkaConsumers.
// But update the map only after updating kafkaConsumers.
hcMap, err := createHostToChannelMap(config)
if err != nil {
return err
newSubs := make(map[subscription]bool)
failedToSubscribe := make(map[eventingduck.SubscriberSpec]error)
for _, cc := range config.ChannelConfigs {
channelRef := provisioners.ChannelReference{
Name: cc.Name,
Namespace: cc.Namespace,
}

newSubs := make(map[subscription]bool)

// Subscribe to new subscriptions.
// TODO: Error returned by subscribe/unsubscribe must be handled.
// https://github.com/knative/eventing/issues/1072.
for _, cc := range config.ChannelConfigs {
channelRef := provisioners.ChannelReference{
Name: cc.Name,
Namespace: cc.Namespace,
}
for _, subSpec := range cc.FanoutConfig.Subscriptions {
sub := newSubscription(subSpec)
if _, ok := d.kafkaConsumers[channelRef][sub]; !ok {
// only subscribe when not exists in channel-subscriptions map
// do not need to resubscribe every time channel fanout config is updated
d.subscribe(channelRef, sub)
for _, subSpec := range cc.FanoutConfig.Subscriptions {
sub := newSubscription(subSpec)
if _, ok := d.kafkaConsumers[channelRef][sub]; !ok {
// only subscribe when not exists in channel-subscriptions map
// do not need to resubscribe every time channel fanout config is updated
if err := d.subscribe(channelRef, sub); err != nil {
failedToSubscribe[subSpec] = err
}

newSubs[sub] = true
}
newSubs[sub] = true
}
}

// Unsubscribe and close consumer for any deleted subscriptions
for channelRef, subMap := range d.kafkaConsumers {
for sub := range subMap {
if ok := newSubs[sub]; !ok {
d.unsubscribe(channelRef, sub)
}
// Unsubscribe and close consumer for any deleted subscriptions
for channelRef, subMap := range d.kafkaConsumers {
for sub := range subMap {
if ok := newSubs[sub]; !ok {
d.unsubscribe(channelRef, sub)
}
}
// At this point all updates are done and hostToChannelMap is created successfully.
// Update the atomic value.
d.setHostToChannelMap(hcMap)
}
return failedToSubscribe, nil
}

// UpdateHostToChannelMap will be called by new CRD based kafka channel dispatcher controller, instead of UpdateConfig.
func (d *KafkaDispatcher) UpdateHostToChannelMap(config *multichannelfanout.Config) error {
if config == nil {
return errors.New("nil config")
}

d.hostToChannelMapLock.Lock()
defer d.hostToChannelMapLock.Unlock()

// Update the config so that it can be used for comparison during next sync
d.setConfig(config)
hcMap, err := createHostToChannelMap(config)
if err != nil {
return err
}

d.setHostToChannelMap(hcMap)
return nil
}

// UpdateConfig is used by older kafka channel dispatcher controller that is based on ClusterChannelProvisioners model
// Remove this function when the older channel code is deleted
func (d *KafkaDispatcher) UpdateConfig(config *multichannelfanout.Config) error {
if config == nil {
return errors.New("nil config")
}

if _, err := d.UpdateKafkaConsumers(config); err != nil {
return err
}
if err := d.UpdateHostToChannelMap(config); err != nil {
return err
}

return nil
}

Expand Down
6 changes: 0 additions & 6 deletions contrib/kafka/pkg/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,6 @@ func TestDispatcher_UpdateConfig(t *testing.T) {
if diff := sets.NewString(tc.unsubscribes...).Difference(oldSubscribers); diff.Len() != 0 {
t.Errorf("subscriptions %+v were never subscribed", diff)
}
if diff := cmp.Diff(tc.oldConfig, d.getConfig()); diff != "" {
t.Errorf("unexpected config (-want, +got) = %v", diff)
}
if diff := cmp.Diff(tc.oldHostToChanMap, d.getHostToChannelMap()); diff != "" {
t.Errorf("unexpected hostToChannelMap (-want, +got) = %v", diff)
}
Expand Down Expand Up @@ -450,9 +447,6 @@ func TestDispatcher_UpdateConfig(t *testing.T) {
if diff := cmp.Diff(tc.newHostToChanMap, d.getHostToChannelMap()); diff != "" {
t.Errorf("unexpected hostToChannelMap (-want, +got) = %v", diff)
}
if diff := cmp.Diff(tc.newConfig, d.getConfig()); diff != "" {
t.Errorf("unexpected config (-want, +got) = %v", diff)
}

})
}
Expand Down
Loading