Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,30 @@ fork](https://help.github.com/articles/syncing-a-fork/)._

Once you reach this point you are ready to do a full build and deploy as follows.

### Configure Istio per namespace

Clusters not using Istio may skip this step.

Each namespace that contains a Channel needs to be configured so that Istio will bridge the service mesh for a Channel to the Bus hosting that channel. The following snippet will need to be applied to each namespace that will use a Channel. Replace $NAMESPACE both in the metadata struct and the service hosts.

```yaml
apiVersion: networking.istio.io/v1alpha3
kind: ServiceEntry
metadata:
name: knative-channels
namespace: $NAMESPACE
spec:
hosts:
- '*-channel.$NAMESPACE.svc.cluster.local'
location: MESH_INTERNAL
ports:
- name: http
number: 80
protocol: HTTP
```

This ServiceEntry extends the Istio mesh to include Channels.

## Starting Eventing Controller

Once you've [setup your development environment](#getting-started), stand up
Expand Down
21 changes: 4 additions & 17 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions cmd/controller/controller-runtime-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"github.com/knative/eventing/pkg/controller/feed"
"github.com/knative/eventing/pkg/controller/flow"

istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"

"github.com/knative/eventing/pkg/controller/eventtype"
"k8s.io/apimachinery/pkg/runtime"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
Expand Down Expand Up @@ -57,7 +55,6 @@ func controllerRuntimeStart() error {
channelsv1alpha1.AddToScheme,
feedsv1alpha1.AddToScheme,
flowsv1alpha1.AddToScheme,
istiov1alpha3.AddToScheme,
}
for _, schemeFunc := range schemeFuncs {
schemeFunc(mrg.GetScheme())
Expand Down
12 changes: 2 additions & 10 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

Expand All @@ -36,8 +37,6 @@ import (
"github.com/knative/eventing/pkg/controller/bus"
"github.com/knative/eventing/pkg/controller/channel"
"github.com/knative/eventing/pkg/controller/clusterbus"
sharedclientset "github.com/knative/pkg/client/clientset/versioned"
sharedinformers "github.com/knative/pkg/client/informers/externalversions"
"github.com/knative/pkg/signals"

"github.com/knative/eventing/pkg/logconfig"
Expand Down Expand Up @@ -96,11 +95,6 @@ func main() {
logger.Fatalf("Error building clientset: %v", err)
}

sharedClient, err := sharedclientset.NewForConfig(cfg)
if err != nil {
logger.Fatalf("Error building shared clientset: %v", err)
}

// TODO: Rip this out from all the controllers since we can get it
// from provider.
// Build a rest.Config from configuration injected into the Pod by
Expand All @@ -112,7 +106,6 @@ func main() {

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
informerFactory := informers.NewSharedInformerFactory(client, time.Second*30)
sharedInformerFactory := sharedinformers.NewSharedInformerFactory(sharedClient, time.Second*30)

// Watch the logging config map and dynamically update logging levels.
configMapWatcher := configmap.NewDefaultWatcher(kubeClient, system.Namespace)
Expand All @@ -134,12 +127,11 @@ func main() {
controllers := make([]controller.Interface, 0, len(ctors))
for _, ctor := range ctors {
controllers = append(controllers,
ctor(kubeClient, client, sharedClient, restConfig, kubeInformerFactory, informerFactory, sharedInformerFactory))
ctor(kubeClient, client, restConfig, kubeInformerFactory, informerFactory))
}

go kubeInformerFactory.Start(stopCh)
go informerFactory.Start(stopCh)
go sharedInformerFactory.Start(stopCh)

// Start all of the controllers.
for _, ctrlr := range controllers {
Expand Down
7 changes: 0 additions & 7 deletions pkg/apis/channels/v1alpha1/channel_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,6 @@ const (
// Serviceable means the service addressing the channel exists.
ChannelServiceable ChannelConditionType = "Serviceable"

// Routable means the virtual service forwarding traffic from the channel service to the
// bus is created.
ChannelRoutable ChannelConditionType = "Routeable"

// Provisioned means the channel backing construct on the bus middleware has been set up.
ChannelProvisioned ChannelConditionType = "Provisioned"
)
Expand All @@ -114,9 +110,6 @@ type ChannelStatus struct {
// A reference to the k8s Service backing this channel, if successfully synced.
Service *v1.LocalObjectReference `json:"service,omitempty"`

// A reference to the istio VirtualService backing this channel, if successfully synced.
VirtualService *v1.LocalObjectReference `json:"virtualService,omitempty"`

// Represents the latest available observations of a channel's current state.
// +patchMergeKey=type
// +patchStrategy=merge
Expand Down
9 changes: 0 additions & 9 deletions pkg/apis/channels/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions pkg/apis/flows/v1alpha1/flow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"encoding/json"

channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1"
feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1"
"github.com/knative/pkg/apis"
Expand Down Expand Up @@ -72,8 +73,9 @@ type FlowSpec struct {
// provide the resolved target of the action. Currently we inspect
// the objects Status and see if there's a predefined Status field
// that we will then use to give to Feed object as the target. Currently
// must resolve to a k8s service or Istio virtual service. Note that by
// in the future we should try to utilize subresources (/resolve ?) to
// must resolve to a k8s service.
//
// In the future we should try to utilize subresources (/resolve ?) to
// utilize this, but CRDs do not support subresources yet, so we need
// to rely on a specified Status field today. By relying on this behaviour
// we can utilize a dynamic client instead of having to understand all
Expand Down
11 changes: 6 additions & 5 deletions pkg/buses/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ func NewBusDispatcher(ref BusReference, handlerFuncs EventHandlerFuncs, opts *Bu
opts.MessageDispatcher = NewMessageDispatcher(opts.Logger.Named(dispatcherLoggingComponent))
}
if opts.MessageReceiver == nil {
opts.MessageReceiver = NewMessageReceiver(func(channel ChannelReference, message *Message) error {
return b.receiveMessage(channel, message)
opts.MessageReceiver = NewMessageReceiver(ref, func(host ChannelHostReference, message *Message) error {
return b.receiveMessage(host, message)
}, opts.Logger.Named(receiverLoggingComponent))
}

Expand Down Expand Up @@ -171,12 +171,13 @@ func (b bus) Run(threadiness int, stopCh <-chan struct{}) {
<-stopCh
}

func (b *bus) receiveMessage(channel ChannelReference, message *Message) error {
_, err := b.cache.Channel(channel)
func (b *bus) receiveMessage(host ChannelHostReference, message *Message) error {
channel, err := b.cache.ChannelHost(host)
if err != nil {
return ErrUnknownChannel
}
return b.handlerFuncs.onReceiveMessage(channel, message)
ref := NewChannelReference(channel)
return b.handlerFuncs.onReceiveMessage(ref, message)
}

// DispatchMessage sends a message to a subscriber. This function is only
Expand Down
21 changes: 21 additions & 0 deletions pkg/buses/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
func NewCache() *Cache {
return &Cache{
channels: make(map[ChannelReference]*channelsv1alpha1.Channel),
channelHosts: make(map[ChannelHostReference]ChannelReference),
subscriptions: make(map[SubscriptionReference]*channelsv1alpha1.Subscription),
}
}
Expand All @@ -36,6 +37,7 @@ func NewCache() *Cache {
// provisioned and comparing updated resources to the provisioned version.
type Cache struct {
channels map[ChannelReference]*channelsv1alpha1.Channel
channelHosts map[ChannelHostReference]ChannelReference
subscriptions map[SubscriptionReference]*channelsv1alpha1.Subscription
}

Expand All @@ -49,6 +51,16 @@ func (c *Cache) Channel(ref ChannelReference) (*channelsv1alpha1.Channel, error)
return channel, nil
}

// ChannelHost returns a cached channel for a provided channel host reference
// or an error if the channel host is not in the cache.
func (c *Cache) ChannelHost(host ChannelHostReference) (*channelsv1alpha1.Channel, error) {
ref, ok := c.channelHosts[host]
if !ok {
return nil, fmt.Errorf("unknown channel host %q", host.String())
}
return c.Channel(ref)
}

// Subscription returns a cached subscription for provided reference or an
// error if the subscription is not in the cache.
func (c *Cache) Subscription(ref SubscriptionReference) (*channelsv1alpha1.Subscription, error) {
Expand All @@ -67,6 +79,10 @@ func (c *Cache) AddChannel(channel *channelsv1alpha1.Channel) {
}
ref := NewChannelReference(channel)
c.channels[ref] = channel
if host, err := NewChannelHostReferenceFromChannel(channel); err == nil {
// an error is expected if the channel is not serviceable yet
c.channelHosts[host] = ref
}
}

// RemoveChannel removes the provided channel from the cache.
Expand All @@ -76,6 +92,11 @@ func (c *Cache) RemoveChannel(channel *channelsv1alpha1.Channel) {
}
ref := NewChannelReference(channel)
delete(c.channels, ref)
if host, err := NewChannelHostReferenceFromChannel(channel); err != nil {
// it's ok if key is abandoned in the channelHost cache after being
// removed from the channel cache, but we should try to clean it up.
delete(c.channelHosts, host)
}
}

// AddSubscription adds, or updates, the provided subscription to the cache for
Expand Down
Loading