Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
5085643
Import Broker, Stream and Subscriptions CRDs
scothis May 30, 2018
b2b526c
Add controllers for new CRDs
scothis May 31, 2018
c819788
Stub broker implementation
scothis May 31, 2018
0cac79b
Add hello and square samples
scothis May 31, 2018
9bf905c
Add subscription monitor
scothis May 31, 2018
d18efa6
Merge remote-tracking branch 'upstream/master' into stream-crd
scothis Jun 1, 2018
e2d423a
Add Streams to the Subscription Monitor
scothis Jun 1, 2018
ad3970d
Shift burden of tracking subscriptions and stream to broker
scothis Jun 1, 2018
04da06a
Merge pull request #1 from scothis/stream-crd-monitor
scothis Jun 5, 2018
1069cda
Cleanup lingering CRD group names
scothis Jun 5, 2018
dcbab6a
Rename Stream to Channel
scothis Jun 5, 2018
e39fd26
Rename Broker to Bus
scothis Jun 5, 2018
efd068e
Drop 'Busless' support
scothis Jun 5, 2018
33866d6
Merge remote-tracking branch 'upstream/master' into stream-crd
scothis Jun 5, 2018
4b5eb8f
Move channel CRDs into channels.eventing.knative.dev group
scothis Jun 5, 2018
c9118fa
polish
scothis Jun 7, 2018
9f3fa72
Split bus provissioner from event handler
scothis Jun 7, 2018
20d1088
Merge remote-tracking branch 'upstream/master' into stream-crd
scothis Jun 7, 2018
de07ce8
Provisioner polish
scothis Jun 7, 2018
a45911d
Descope bus service account to read-only for bus, channel and subscri…
scothis Jun 7, 2018
0972a93
Parameters/arguments for bus->channel->subscription relationships
scothis Jun 7, 2018
9262da3
Add missing license headers
scothis Jun 8, 2018
ac6c8ff
Rename group channels.eventing.knative.dev to channels.knative.dev
scothis Jun 8, 2018
41887ee
Create a single sample that subscribes an Knative serivce to a channel
scothis Jun 8, 2018
0695c25
polish
scothis Jun 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
informers "github.com/knative/eventing/pkg/client/informers/externalversions"
"github.com/knative/eventing/pkg/controller"
"github.com/knative/eventing/pkg/controller/bind"
"github.com/knative/eventing/pkg/controller/bus"
"github.com/knative/eventing/pkg/controller/channel"
"github.com/knative/eventing/pkg/signals"

"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -85,6 +87,8 @@ func main() {
// Add new controllers here.
ctors := []controller.Constructor{
bind.NewController,
bus.NewController,
channel.NewController,
}

// Build all of our controllers, with the clients constructed above.
Expand Down
161 changes: 161 additions & 0 deletions cmd/stub-bus/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"bytes"
"flag"
"fmt"
"io/ioutil"
"net/http"
"os"
"strings"
"time"

"github.com/go-martini/martini"
"github.com/golang/glog"
channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1"
clientset "github.com/knative/eventing/pkg/client/clientset/versioned"
informers "github.com/knative/eventing/pkg/client/informers/externalversions"
"github.com/knative/eventing/pkg/signals"
"github.com/knative/eventing/pkg/subscription"
"k8s.io/client-go/tools/clientcmd"
)

var (
masterURL string
kubeconfig string

bus = os.Getenv("BUS_NAME")
forwardHeaders = []string{
"content-type",
"x-request-id",
"x-b3-traceid",
"x-b3-spanid",
"x-b3-parentspanid",
"x-b3-sampled",
"x-b3-flags",
"x-ot-span-context",
}
)

func splitChannelName(host string) (string, string) {
chunks := strings.Split(host, ".")
channel := chunks[0]
namespace := chunks[1]
return channel, namespace
}

func main() {
flag.Parse()

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
glog.Fatalf("Error building kubeconfig: %s", err.Error())
}

client, err := clientset.NewForConfig(cfg)
if err != nil {
glog.Fatalf("Error building clientset: %s", err.Error())
}

informerFactory := informers.NewSharedInformerFactory(client, time.Second*30)
monitor := subscription.NewMonitor(bus, informerFactory, subscription.MonitorEventHandlerFuncs{
ProvisionFunc: func(channel channelsv1alpha1.Channel) {
glog.Infof("Provision channel %q\n", channel.Name)
},
UnprovisionFunc: func(channel channelsv1alpha1.Channel) {
glog.Infof("Unprovision channel %q\n", channel.Name)
},
SubscribeFunc: func(subscription channelsv1alpha1.Subscription) {
glog.Infof("Subscribe %q to %q channel\n", subscription.Spec.Subscriber, subscription.Spec.Channel)
},
UnsubscribeFunc: func(subscription channelsv1alpha1.Subscription) {
glog.Infof("Unubscribe %q from %q channel\n", subscription.Spec.Subscriber, subscription.Spec.Channel)
},
})
go informerFactory.Start(stopCh)

m := createServer(monitor)
m.Run()

glog.Flush()
}

func createServer(monitor *subscription.Monitor) *martini.ClassicMartini {
m := martini.Classic()

m.Post("/", func(req *http.Request, res http.ResponseWriter) {
host := req.Host
glog.Infof("Recieved request for %s\n", host)
channel, namespace := splitChannelName(host)
subscriptions := monitor.Subscriptions(channel, namespace)
if subscriptions == nil {
res.WriteHeader(http.StatusNotFound)
return
}

body, err := ioutil.ReadAll(req.Body)
if err != nil {
res.WriteHeader(http.StatusInternalServerError)
return
}

res.WriteHeader(http.StatusAccepted)
go func() {
if len(*subscriptions) == 0 {
glog.Warningf("No subscribers for channel %q\n", channel)
}

// make upstream requests
client := &http.Client{}

for _, subscription := range *subscriptions {
go func(subscriber string) {
glog.Infof("Sending to %q for %q\n", subscriber, channel)

url := fmt.Sprintf("http://%s/", subscriber)
request, err := http.NewRequest(http.MethodPost, url, bytes.NewReader(body))
if err != nil {
glog.Errorf("Unable to create subscriber request %v", err)
}
request.Header.Set("x-bus", bus)
request.Header.Set("x-channel", channel)
for _, header := range forwardHeaders {
if value := req.Header.Get(header); value != "" {
request.Header.Set(header, value)
}
}
_, err = client.Do(request)
if err != nil {
glog.Errorf("Unable to complete subscriber request %v", err)
}
}(subscription.Subscriber)
}
}()
})

return m
}

func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
}
25 changes: 25 additions & 0 deletions config/bus.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2018 Google, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: buses.channels.knative.dev
spec:
scope: Namespaced
group: channels.knative.dev
version: v1alpha1
names:
kind: Bus
plural: buses
singular: bus
25 changes: 25 additions & 0 deletions config/buses/stub.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2018 Google, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: channels.knative.dev/v1alpha1
kind: Bus
metadata:
name: stub
spec:
dispatcher:
name: dispatcher
image: github.com/knative/eventing/cmd/stub-bus
args: [
"-logtostderr",
"-stderrthreshold", "INFO",
]
25 changes: 25 additions & 0 deletions config/channel.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2018 Google, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: channels.channels.knative.dev
spec:
scope: Namespaced
group: channels.knative.dev
version: v1alpha1
names:
kind: Channel
plural: channels
singular: channel
21 changes: 21 additions & 0 deletions config/clusterrole.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright 2018 Google, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: knative-channels-bus
rules:
- apiGroups: ["channels.knative.dev"]
resources: ["buses", "channels", "subscriptions"]
verbs: ["get", "watch", "list"]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, missing newline here.

25 changes: 25 additions & 0 deletions config/subscription.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2018 Google, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: subscriptions.channels.knative.dev
spec:
scope: Namespaced
group: channels.knative.dev
version: v1alpha1
names:
kind: Subscription
plural: subscriptions
singular: subscription
2 changes: 1 addition & 1 deletion hack/update-codegen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ CODEGEN_PKG=${CODEGEN_PKG:-$(cd ${SCRIPT_ROOT}; ls -d -1 ./vendor/k8s.io/code-ge
# instead of the $GOPATH directly. For normal projects this can be dropped.
${CODEGEN_PKG}/generate-groups.sh "deepcopy,client,informer,lister" \
github.com/knative/eventing/pkg/client github.com/knative/eventing/pkg/apis \
feeds:v1alpha1 \
"channels:v1alpha1 feeds:v1alpha1 istio:v1alpha2" \
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see Istio being used anywhere but perhaps I missed it? Rest of this seems fine, but just trying to understand why we need to import Istio in.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope to get rid of the custom Istio client in the near future. I would have used the client created by serving, except that it doesn't have the struct for rewriting the http host in a RouteRule.

I can drive the PR in serving to completion and then drop this client.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There it was ! :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/cc @tcnghia who is working on importing the Istio 0.8 types into serving.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed a bug, with the hope that we only need to vendor/ this stuff in the future. istio/istio#6084

In the mean time, for istio v1alpha3 we will need to do the same (cloning _types.go from *.proto files and run codegen)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you use the client in /serving you'll benefit from my istio/v1alpha3 PR (coming). I am translating everything, so you should have access to all features, unlike the version we hae for istio/v1alpha2

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tcnghia Thanks. I'll keep a look out for the PR in serving.

--go-header-file ${SCRIPT_ROOT}/hack/boilerplate/boilerplate.go.txt

# Make sure our dependencies are up-to-date
Expand Down
21 changes: 21 additions & 0 deletions pkg/apis/channels/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
Copyright 2018 Google, Inc. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package channels

const (
GroupName = "channels.knative.dev"
)
Loading