diff --git a/Gopkg.lock b/Gopkg.lock index 12c12955ee9..3d3b3314f23 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -689,6 +689,7 @@ [[projects]] name = "k8s.io/apimachinery" packages = [ + "pkg/api/equality", "pkg/api/errors", "pkg/api/meta", "pkg/api/resource", @@ -742,6 +743,7 @@ packages = [ "discovery", "discovery/fake", + "dynamic", "informers", "informers/admissionregistration", "informers/admissionregistration/v1alpha1", @@ -947,6 +949,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "bb7f31f1565718a02c0870386102ccf73042f5b4f0d1eb6857fe927a78a88864" + inputs-digest = "306367db1d23c58ed8fee3d130bbac10637d54f5730f6465a1cea5704ded7a9e" solver-name = "gps-cdcl" solver-version = 1 diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 537b9a8326b..f75ee11d824 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -25,6 +25,7 @@ import ( "github.com/golang/glog" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" @@ -39,6 +40,7 @@ import ( "github.com/knative/eventing/pkg/controller/channel" "github.com/knative/eventing/pkg/controller/clusterbus" "github.com/knative/eventing/pkg/controller/feed" + "github.com/knative/eventing/pkg/controller/flow" "github.com/knative/eventing/pkg/signals" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -81,6 +83,13 @@ func main() { glog.Fatalf("Error building serving clientset: %s", err.Error()) } + // Build a rest.Config from configuration injected into the Pod by + // Kubernetes. Clients will use the Pod's ServiceAccount principal. + restConfig, err := rest.InClusterConfig() + if err != nil { + glog.Fatalf("Error building rest config: %v", err.Error()) + } + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) informerFactory := informers.NewSharedInformerFactory(client, time.Second*30) servingInformerFactory := servinginformers.NewSharedInformerFactory(servingClient, time.Second*30) @@ -88,6 +97,7 @@ func main() { // Add new controllers here. ctors := []controller.Constructor{ feed.NewController, + flow.NewController, bus.NewController, clusterbus.NewController, channel.NewController, @@ -97,7 +107,7 @@ func main() { controllers := make([]controller.Interface, 0, len(ctors)) for _, ctor := range ctors { controllers = append(controllers, - ctor(kubeClient, client, servingClient, kubeInformerFactory, informerFactory, servingInformerFactory)) + ctor(kubeClient, client, servingClient, restConfig, kubeInformerFactory, informerFactory, servingInformerFactory)) } go kubeInformerFactory.Start(stopCh) diff --git a/config/flow.yaml b/config/flow.yaml new file mode 100644 index 00000000000..1a003ab2942 --- /dev/null +++ b/config/flow.yaml @@ -0,0 +1,24 @@ +# Copyright 2018 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +apiVersion: apiextensions.k8s.io/v1beta1 +kind: CustomResourceDefinition +metadata: + name: flows.flows.knative.dev +spec: + group: flows.knative.dev + version: v1alpha1 + names: + kind: Flow + plural: flows + scope: Namespaced diff --git a/hack/update-codegen.sh b/hack/update-codegen.sh index 18e003263bb..beb88c06edf 100755 --- a/hack/update-codegen.sh +++ b/hack/update-codegen.sh @@ -27,7 +27,7 @@ CODEGEN_PKG=${CODEGEN_PKG:-$(cd ${SCRIPT_ROOT}; ls -d -1 ./vendor/k8s.io/code-ge # instead of the $GOPATH directly. For normal projects this can be dropped. ${CODEGEN_PKG}/generate-groups.sh "deepcopy,client,informer,lister" \ github.com/knative/eventing/pkg/client github.com/knative/eventing/pkg/apis \ - "channels:v1alpha1 feeds:v1alpha1" \ + "channels:v1alpha1 feeds:v1alpha1 flows:v1alpha1" \ --go-header-file ${SCRIPT_ROOT}/hack/boilerplate/boilerplate.go.txt # Make sure our dependencies are up-to-date diff --git a/pkg/apis/flows/v1alpha1/flow_types.go b/pkg/apis/flows/v1alpha1/flow_types.go index 05122996fcf..40e0853499b 100644 --- a/pkg/apis/flows/v1alpha1/flow_types.go +++ b/pkg/apis/flows/v1alpha1/flow_types.go @@ -20,6 +20,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" "k8s.io/apimachinery/pkg/runtime" ) @@ -162,24 +163,7 @@ type EventTrigger struct { // by the actual trigger actuator. // NOTE: experimental field. All secrets in ParametersFrom will be // resolved and given to event sources in the Parameters field. - ParametersFrom []ParametersFromSource `json:"parametersFrom,omitempty"` -} - -// ParametersFromSource represents the source of a set of Parameters -// TODO: consider making this into a new secret type. -type ParametersFromSource struct { - // The Secret key to select from. - // The value must be a JSON object. - //+optional - SecretKeyRef *SecretKeyReference `json:"secretKeyRef,omitempty"` -} - -// SecretKeyReference references a key of a Secret. -type SecretKeyReference struct { - // The name of the secret in the resource's namespace to select from. - Name string `json:"name"` - // The key of the secret to select from. Must be a valid secret key. - Key string `json:"key"` + ParametersFrom []feedsv1alpha1.ParametersFromSource `json:"parametersFrom,omitempty"` } // FlowStatus is the status for a Flow resource diff --git a/pkg/apis/flows/v1alpha1/register.go b/pkg/apis/flows/v1alpha1/register.go new file mode 100644 index 00000000000..918c85b031c --- /dev/null +++ b/pkg/apis/flows/v1alpha1/register.go @@ -0,0 +1,53 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "github.com/knative/eventing/pkg/apis/flows" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// SchemeGroupVersion is group version used to register these objects +var SchemeGroupVersion = schema.GroupVersion{Group: flows.GroupName, Version: "v1alpha1"} + +// Kind takes an unqualified kind and returns back a Group qualified GroupKind +func Kind(kind string) schema.GroupKind { + return SchemeGroupVersion.WithKind(kind).GroupKind() +} + +// Resource takes an unqualified resource and returns a Group qualified GroupResource +func Resource(resource string) schema.GroupResource { + return SchemeGroupVersion.WithResource(resource).GroupResource() +} + +var ( + SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) + AddToScheme = SchemeBuilder.AddToScheme +) + +// Adds the list of known types to Scheme. +func addKnownTypes(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(SchemeGroupVersion, + &Flow{}, + &FlowList{}, + ) + metav1.AddToGroupVersion(scheme, SchemeGroupVersion) + return nil +} diff --git a/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go new file mode 100644 index 00000000000..03bae2eb6b5 --- /dev/null +++ b/pkg/apis/flows/v1alpha1/zz_generated.deepcopy.go @@ -0,0 +1,218 @@ +// +build !ignore_autogenerated + +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + feeds_v1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" + v1 "k8s.io/api/core/v1" + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventTrigger) DeepCopyInto(out *EventTrigger) { + *out = *in + if in.Parameters != nil { + in, out := &in.Parameters, &out.Parameters + if *in == nil { + *out = nil + } else { + *out = new(runtime.RawExtension) + (*in).DeepCopyInto(*out) + } + } + if in.ParametersFrom != nil { + in, out := &in.ParametersFrom, &out.ParametersFrom + *out = make([]feeds_v1alpha1.ParametersFromSource, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EventTrigger. +func (in *EventTrigger) DeepCopy() *EventTrigger { + if in == nil { + return nil + } + out := new(EventTrigger) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Flow) DeepCopyInto(out *Flow) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Flow. +func (in *Flow) DeepCopy() *Flow { + if in == nil { + return nil + } + out := new(Flow) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *Flow) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlowAction) DeepCopyInto(out *FlowAction) { + *out = *in + if in.Target != nil { + in, out := &in.Target, &out.Target + if *in == nil { + *out = nil + } else { + *out = new(v1.ObjectReference) + **out = **in + } + } + if in.TargetURI != nil { + in, out := &in.TargetURI, &out.TargetURI + if *in == nil { + *out = nil + } else { + *out = new(string) + **out = **in + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlowAction. +func (in *FlowAction) DeepCopy() *FlowAction { + if in == nil { + return nil + } + out := new(FlowAction) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlowCondition) DeepCopyInto(out *FlowCondition) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlowCondition. +func (in *FlowCondition) DeepCopy() *FlowCondition { + if in == nil { + return nil + } + out := new(FlowCondition) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlowList) DeepCopyInto(out *FlowList) { + *out = *in + out.TypeMeta = in.TypeMeta + out.ListMeta = in.ListMeta + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]Flow, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlowList. +func (in *FlowList) DeepCopy() *FlowList { + if in == nil { + return nil + } + out := new(FlowList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *FlowList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlowSpec) DeepCopyInto(out *FlowSpec) { + *out = *in + in.Action.DeepCopyInto(&out.Action) + in.Trigger.DeepCopyInto(&out.Trigger) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlowSpec. +func (in *FlowSpec) DeepCopy() *FlowSpec { + if in == nil { + return nil + } + out := new(FlowSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlowStatus) DeepCopyInto(out *FlowStatus) { + *out = *in + if in.Conditions != nil { + in, out := &in.Conditions, &out.Conditions + *out = make([]FlowCondition, len(*in)) + copy(*out, *in) + } + if in.FlowContext != nil { + in, out := &in.FlowContext, &out.FlowContext + if *in == nil { + *out = nil + } else { + *out = new(runtime.RawExtension) + (*in).DeepCopyInto(*out) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlowStatus. +func (in *FlowStatus) DeepCopy() *FlowStatus { + if in == nil { + return nil + } + out := new(FlowStatus) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/client/clientset/versioned/clientset.go b/pkg/client/clientset/versioned/clientset.go index ae51f0e4c98..28e1887d3b7 100644 --- a/pkg/client/clientset/versioned/clientset.go +++ b/pkg/client/clientset/versioned/clientset.go @@ -22,6 +22,7 @@ import ( glog "github.com/golang/glog" channelsv1alpha1 "github.com/knative/eventing/pkg/client/clientset/versioned/typed/channels/v1alpha1" feedsv1alpha1 "github.com/knative/eventing/pkg/client/clientset/versioned/typed/feeds/v1alpha1" + flowsv1alpha1 "github.com/knative/eventing/pkg/client/clientset/versioned/typed/flows/v1alpha1" discovery "k8s.io/client-go/discovery" rest "k8s.io/client-go/rest" flowcontrol "k8s.io/client-go/util/flowcontrol" @@ -35,6 +36,9 @@ type Interface interface { FeedsV1alpha1() feedsv1alpha1.FeedsV1alpha1Interface // Deprecated: please explicitly pick a version if possible. Feeds() feedsv1alpha1.FeedsV1alpha1Interface + FlowsV1alpha1() flowsv1alpha1.FlowsV1alpha1Interface + // Deprecated: please explicitly pick a version if possible. + Flows() flowsv1alpha1.FlowsV1alpha1Interface } // Clientset contains the clients for groups. Each group has exactly one @@ -43,6 +47,7 @@ type Clientset struct { *discovery.DiscoveryClient channelsV1alpha1 *channelsv1alpha1.ChannelsV1alpha1Client feedsV1alpha1 *feedsv1alpha1.FeedsV1alpha1Client + flowsV1alpha1 *flowsv1alpha1.FlowsV1alpha1Client } // ChannelsV1alpha1 retrieves the ChannelsV1alpha1Client @@ -67,6 +72,17 @@ func (c *Clientset) Feeds() feedsv1alpha1.FeedsV1alpha1Interface { return c.feedsV1alpha1 } +// FlowsV1alpha1 retrieves the FlowsV1alpha1Client +func (c *Clientset) FlowsV1alpha1() flowsv1alpha1.FlowsV1alpha1Interface { + return c.flowsV1alpha1 +} + +// Deprecated: Flows retrieves the default version of FlowsClient. +// Please explicitly pick a version. +func (c *Clientset) Flows() flowsv1alpha1.FlowsV1alpha1Interface { + return c.flowsV1alpha1 +} + // Discovery retrieves the DiscoveryClient func (c *Clientset) Discovery() discovery.DiscoveryInterface { if c == nil { @@ -91,6 +107,10 @@ func NewForConfig(c *rest.Config) (*Clientset, error) { if err != nil { return nil, err } + cs.flowsV1alpha1, err = flowsv1alpha1.NewForConfig(&configShallowCopy) + if err != nil { + return nil, err + } cs.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy) if err != nil { @@ -106,6 +126,7 @@ func NewForConfigOrDie(c *rest.Config) *Clientset { var cs Clientset cs.channelsV1alpha1 = channelsv1alpha1.NewForConfigOrDie(c) cs.feedsV1alpha1 = feedsv1alpha1.NewForConfigOrDie(c) + cs.flowsV1alpha1 = flowsv1alpha1.NewForConfigOrDie(c) cs.DiscoveryClient = discovery.NewDiscoveryClientForConfigOrDie(c) return &cs @@ -116,6 +137,7 @@ func New(c rest.Interface) *Clientset { var cs Clientset cs.channelsV1alpha1 = channelsv1alpha1.New(c) cs.feedsV1alpha1 = feedsv1alpha1.New(c) + cs.flowsV1alpha1 = flowsv1alpha1.New(c) cs.DiscoveryClient = discovery.NewDiscoveryClient(c) return &cs diff --git a/pkg/client/clientset/versioned/fake/clientset_generated.go b/pkg/client/clientset/versioned/fake/clientset_generated.go index b83d8d15052..43350c1220b 100644 --- a/pkg/client/clientset/versioned/fake/clientset_generated.go +++ b/pkg/client/clientset/versioned/fake/clientset_generated.go @@ -24,6 +24,8 @@ import ( fakechannelsv1alpha1 "github.com/knative/eventing/pkg/client/clientset/versioned/typed/channels/v1alpha1/fake" feedsv1alpha1 "github.com/knative/eventing/pkg/client/clientset/versioned/typed/feeds/v1alpha1" fakefeedsv1alpha1 "github.com/knative/eventing/pkg/client/clientset/versioned/typed/feeds/v1alpha1/fake" + flowsv1alpha1 "github.com/knative/eventing/pkg/client/clientset/versioned/typed/flows/v1alpha1" + fakeflowsv1alpha1 "github.com/knative/eventing/pkg/client/clientset/versioned/typed/flows/v1alpha1/fake" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/discovery" @@ -91,3 +93,13 @@ func (c *Clientset) FeedsV1alpha1() feedsv1alpha1.FeedsV1alpha1Interface { func (c *Clientset) Feeds() feedsv1alpha1.FeedsV1alpha1Interface { return &fakefeedsv1alpha1.FakeFeedsV1alpha1{Fake: &c.Fake} } + +// FlowsV1alpha1 retrieves the FlowsV1alpha1Client +func (c *Clientset) FlowsV1alpha1() flowsv1alpha1.FlowsV1alpha1Interface { + return &fakeflowsv1alpha1.FakeFlowsV1alpha1{Fake: &c.Fake} +} + +// Flows retrieves the FlowsV1alpha1Client +func (c *Clientset) Flows() flowsv1alpha1.FlowsV1alpha1Interface { + return &fakeflowsv1alpha1.FakeFlowsV1alpha1{Fake: &c.Fake} +} diff --git a/pkg/client/clientset/versioned/fake/register.go b/pkg/client/clientset/versioned/fake/register.go index 3c2e7771fd1..d4f9991e3ca 100644 --- a/pkg/client/clientset/versioned/fake/register.go +++ b/pkg/client/clientset/versioned/fake/register.go @@ -21,6 +21,7 @@ package fake import ( channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" + flowsv1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" schema "k8s.io/apimachinery/pkg/runtime/schema" @@ -53,4 +54,5 @@ func init() { func AddToScheme(scheme *runtime.Scheme) { channelsv1alpha1.AddToScheme(scheme) feedsv1alpha1.AddToScheme(scheme) + flowsv1alpha1.AddToScheme(scheme) } diff --git a/pkg/client/clientset/versioned/scheme/register.go b/pkg/client/clientset/versioned/scheme/register.go index 172f530f63c..19d55694dd3 100644 --- a/pkg/client/clientset/versioned/scheme/register.go +++ b/pkg/client/clientset/versioned/scheme/register.go @@ -21,6 +21,7 @@ package scheme import ( channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" + flowsv1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" schema "k8s.io/apimachinery/pkg/runtime/schema" @@ -53,4 +54,5 @@ func init() { func AddToScheme(scheme *runtime.Scheme) { channelsv1alpha1.AddToScheme(scheme) feedsv1alpha1.AddToScheme(scheme) + flowsv1alpha1.AddToScheme(scheme) } diff --git a/pkg/client/clientset/versioned/typed/flows/v1alpha1/doc.go b/pkg/client/clientset/versioned/typed/flows/v1alpha1/doc.go new file mode 100644 index 00000000000..75445c17900 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/flows/v1alpha1/doc.go @@ -0,0 +1,20 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +// This package has the automatically generated typed clients. +package v1alpha1 diff --git a/pkg/client/clientset/versioned/typed/flows/v1alpha1/fake/doc.go b/pkg/client/clientset/versioned/typed/flows/v1alpha1/fake/doc.go new file mode 100644 index 00000000000..128aa183a91 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/flows/v1alpha1/fake/doc.go @@ -0,0 +1,20 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +// Package fake has the automatically generated clients. +package fake diff --git a/pkg/client/clientset/versioned/typed/flows/v1alpha1/fake/fake_flow.go b/pkg/client/clientset/versioned/typed/flows/v1alpha1/fake/fake_flow.go new file mode 100644 index 00000000000..f9c45f274b0 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/flows/v1alpha1/fake/fake_flow.go @@ -0,0 +1,140 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeFlows implements FlowInterface +type FakeFlows struct { + Fake *FakeFlowsV1alpha1 + ns string +} + +var flowsResource = schema.GroupVersionResource{Group: "flows.knative.dev", Version: "v1alpha1", Resource: "flows"} + +var flowsKind = schema.GroupVersionKind{Group: "flows.knative.dev", Version: "v1alpha1", Kind: "Flow"} + +// Get takes name of the flow, and returns the corresponding flow object, and an error if there is any. +func (c *FakeFlows) Get(name string, options v1.GetOptions) (result *v1alpha1.Flow, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(flowsResource, c.ns, name), &v1alpha1.Flow{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Flow), err +} + +// List takes label and field selectors, and returns the list of Flows that match those selectors. +func (c *FakeFlows) List(opts v1.ListOptions) (result *v1alpha1.FlowList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(flowsResource, flowsKind, c.ns, opts), &v1alpha1.FlowList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.FlowList{} + for _, item := range obj.(*v1alpha1.FlowList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested flows. +func (c *FakeFlows) Watch(opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(flowsResource, c.ns, opts)) + +} + +// Create takes the representation of a flow and creates it. Returns the server's representation of the flow, and an error, if there is any. +func (c *FakeFlows) Create(flow *v1alpha1.Flow) (result *v1alpha1.Flow, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(flowsResource, c.ns, flow), &v1alpha1.Flow{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Flow), err +} + +// Update takes the representation of a flow and updates it. Returns the server's representation of the flow, and an error, if there is any. +func (c *FakeFlows) Update(flow *v1alpha1.Flow) (result *v1alpha1.Flow, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(flowsResource, c.ns, flow), &v1alpha1.Flow{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Flow), err +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeFlows) UpdateStatus(flow *v1alpha1.Flow) (*v1alpha1.Flow, error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateSubresourceAction(flowsResource, "status", c.ns, flow), &v1alpha1.Flow{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Flow), err +} + +// Delete takes name of the flow and deletes it. Returns an error if one occurs. +func (c *FakeFlows) Delete(name string, options *v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteAction(flowsResource, c.ns, name), &v1alpha1.Flow{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeFlows) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(flowsResource, c.ns, listOptions) + + _, err := c.Fake.Invokes(action, &v1alpha1.FlowList{}) + return err +} + +// Patch applies the patch and returns the patched flow. +func (c *FakeFlows) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.Flow, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(flowsResource, c.ns, name, data, subresources...), &v1alpha1.Flow{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.Flow), err +} diff --git a/pkg/client/clientset/versioned/typed/flows/v1alpha1/fake/fake_flows_client.go b/pkg/client/clientset/versioned/typed/flows/v1alpha1/fake/fake_flows_client.go new file mode 100644 index 00000000000..266afaff88b --- /dev/null +++ b/pkg/client/clientset/versioned/typed/flows/v1alpha1/fake/fake_flows_client.go @@ -0,0 +1,40 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v1alpha1 "github.com/knative/eventing/pkg/client/clientset/versioned/typed/flows/v1alpha1" + rest "k8s.io/client-go/rest" + testing "k8s.io/client-go/testing" +) + +type FakeFlowsV1alpha1 struct { + *testing.Fake +} + +func (c *FakeFlowsV1alpha1) Flows(namespace string) v1alpha1.FlowInterface { + return &FakeFlows{c, namespace} +} + +// RESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *FakeFlowsV1alpha1) RESTClient() rest.Interface { + var ret *rest.RESTClient + return ret +} diff --git a/pkg/client/clientset/versioned/typed/flows/v1alpha1/flow.go b/pkg/client/clientset/versioned/typed/flows/v1alpha1/flow.go new file mode 100644 index 00000000000..f687c0dd387 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/flows/v1alpha1/flow.go @@ -0,0 +1,174 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1" + scheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// FlowsGetter has a method to return a FlowInterface. +// A group's client should implement this interface. +type FlowsGetter interface { + Flows(namespace string) FlowInterface +} + +// FlowInterface has methods to work with Flow resources. +type FlowInterface interface { + Create(*v1alpha1.Flow) (*v1alpha1.Flow, error) + Update(*v1alpha1.Flow) (*v1alpha1.Flow, error) + UpdateStatus(*v1alpha1.Flow) (*v1alpha1.Flow, error) + Delete(name string, options *v1.DeleteOptions) error + DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error + Get(name string, options v1.GetOptions) (*v1alpha1.Flow, error) + List(opts v1.ListOptions) (*v1alpha1.FlowList, error) + Watch(opts v1.ListOptions) (watch.Interface, error) + Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.Flow, err error) + FlowExpansion +} + +// flows implements FlowInterface +type flows struct { + client rest.Interface + ns string +} + +// newFlows returns a Flows +func newFlows(c *FlowsV1alpha1Client, namespace string) *flows { + return &flows{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the flow, and returns the corresponding flow object, and an error if there is any. +func (c *flows) Get(name string, options v1.GetOptions) (result *v1alpha1.Flow, err error) { + result = &v1alpha1.Flow{} + err = c.client.Get(). + Namespace(c.ns). + Resource("flows"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of Flows that match those selectors. +func (c *flows) List(opts v1.ListOptions) (result *v1alpha1.FlowList, err error) { + result = &v1alpha1.FlowList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("flows"). + VersionedParams(&opts, scheme.ParameterCodec). + Do(). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested flows. +func (c *flows) Watch(opts v1.ListOptions) (watch.Interface, error) { + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("flows"). + VersionedParams(&opts, scheme.ParameterCodec). + Watch() +} + +// Create takes the representation of a flow and creates it. Returns the server's representation of the flow, and an error, if there is any. +func (c *flows) Create(flow *v1alpha1.Flow) (result *v1alpha1.Flow, err error) { + result = &v1alpha1.Flow{} + err = c.client.Post(). + Namespace(c.ns). + Resource("flows"). + Body(flow). + Do(). + Into(result) + return +} + +// Update takes the representation of a flow and updates it. Returns the server's representation of the flow, and an error, if there is any. +func (c *flows) Update(flow *v1alpha1.Flow) (result *v1alpha1.Flow, err error) { + result = &v1alpha1.Flow{} + err = c.client.Put(). + Namespace(c.ns). + Resource("flows"). + Name(flow.Name). + Body(flow). + Do(). + Into(result) + return +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). + +func (c *flows) UpdateStatus(flow *v1alpha1.Flow) (result *v1alpha1.Flow, err error) { + result = &v1alpha1.Flow{} + err = c.client.Put(). + Namespace(c.ns). + Resource("flows"). + Name(flow.Name). + SubResource("status"). + Body(flow). + Do(). + Into(result) + return +} + +// Delete takes name of the flow and deletes it. Returns an error if one occurs. +func (c *flows) Delete(name string, options *v1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("flows"). + Name(name). + Body(options). + Do(). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *flows) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("flows"). + VersionedParams(&listOptions, scheme.ParameterCodec). + Body(options). + Do(). + Error() +} + +// Patch applies the patch and returns the patched flow. +func (c *flows) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.Flow, err error) { + result = &v1alpha1.Flow{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("flows"). + SubResource(subresources...). + Name(name). + Body(data). + Do(). + Into(result) + return +} diff --git a/pkg/client/clientset/versioned/typed/flows/v1alpha1/flows_client.go b/pkg/client/clientset/versioned/typed/flows/v1alpha1/flows_client.go new file mode 100644 index 00000000000..961c1d1972b --- /dev/null +++ b/pkg/client/clientset/versioned/typed/flows/v1alpha1/flows_client.go @@ -0,0 +1,90 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1" + "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" + serializer "k8s.io/apimachinery/pkg/runtime/serializer" + rest "k8s.io/client-go/rest" +) + +type FlowsV1alpha1Interface interface { + RESTClient() rest.Interface + FlowsGetter +} + +// FlowsV1alpha1Client is used to interact with features provided by the flows.knative.dev group. +type FlowsV1alpha1Client struct { + restClient rest.Interface +} + +func (c *FlowsV1alpha1Client) Flows(namespace string) FlowInterface { + return newFlows(c, namespace) +} + +// NewForConfig creates a new FlowsV1alpha1Client for the given config. +func NewForConfig(c *rest.Config) (*FlowsV1alpha1Client, error) { + config := *c + if err := setConfigDefaults(&config); err != nil { + return nil, err + } + client, err := rest.RESTClientFor(&config) + if err != nil { + return nil, err + } + return &FlowsV1alpha1Client{client}, nil +} + +// NewForConfigOrDie creates a new FlowsV1alpha1Client for the given config and +// panics if there is an error in the config. +func NewForConfigOrDie(c *rest.Config) *FlowsV1alpha1Client { + client, err := NewForConfig(c) + if err != nil { + panic(err) + } + return client +} + +// New creates a new FlowsV1alpha1Client for the given RESTClient. +func New(c rest.Interface) *FlowsV1alpha1Client { + return &FlowsV1alpha1Client{c} +} + +func setConfigDefaults(config *rest.Config) error { + gv := v1alpha1.SchemeGroupVersion + config.GroupVersion = &gv + config.APIPath = "/apis" + config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs} + + if config.UserAgent == "" { + config.UserAgent = rest.DefaultKubernetesUserAgent() + } + + return nil +} + +// RESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *FlowsV1alpha1Client) RESTClient() rest.Interface { + if c == nil { + return nil + } + return c.restClient +} diff --git a/pkg/client/clientset/versioned/typed/flows/v1alpha1/generated_expansion.go b/pkg/client/clientset/versioned/typed/flows/v1alpha1/generated_expansion.go new file mode 100644 index 00000000000..2e4de776975 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/flows/v1alpha1/generated_expansion.go @@ -0,0 +1,21 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +type FlowExpansion interface{} diff --git a/pkg/client/informers/externalversions/factory.go b/pkg/client/informers/externalversions/factory.go index 8e8d5bf4685..bfd5b3baa97 100644 --- a/pkg/client/informers/externalversions/factory.go +++ b/pkg/client/informers/externalversions/factory.go @@ -26,6 +26,7 @@ import ( versioned "github.com/knative/eventing/pkg/client/clientset/versioned" channels "github.com/knative/eventing/pkg/client/informers/externalversions/channels" feeds "github.com/knative/eventing/pkg/client/informers/externalversions/feeds" + flows "github.com/knative/eventing/pkg/client/informers/externalversions/flows" internalinterfaces "github.com/knative/eventing/pkg/client/informers/externalversions/internalinterfaces" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" @@ -126,6 +127,7 @@ type SharedInformerFactory interface { Channels() channels.Interface Feeds() feeds.Interface + Flows() flows.Interface } func (f *sharedInformerFactory) Channels() channels.Interface { @@ -135,3 +137,7 @@ func (f *sharedInformerFactory) Channels() channels.Interface { func (f *sharedInformerFactory) Feeds() feeds.Interface { return feeds.New(f, f.namespace, f.tweakListOptions) } + +func (f *sharedInformerFactory) Flows() flows.Interface { + return flows.New(f, f.namespace, f.tweakListOptions) +} diff --git a/pkg/client/informers/externalversions/flows/interface.go b/pkg/client/informers/externalversions/flows/interface.go new file mode 100644 index 00000000000..5534891ac8e --- /dev/null +++ b/pkg/client/informers/externalversions/flows/interface.go @@ -0,0 +1,46 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package flows + +import ( + v1alpha1 "github.com/knative/eventing/pkg/client/informers/externalversions/flows/v1alpha1" + internalinterfaces "github.com/knative/eventing/pkg/client/informers/externalversions/internalinterfaces" +) + +// Interface provides access to each of this group's versions. +type Interface interface { + // V1alpha1 provides access to shared informers for resources in V1alpha1. + V1alpha1() v1alpha1.Interface +} + +type group struct { + factory internalinterfaces.SharedInformerFactory + namespace string + tweakListOptions internalinterfaces.TweakListOptionsFunc +} + +// New returns a new Interface. +func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface { + return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} +} + +// V1alpha1 returns a new v1alpha1.Interface. +func (g *group) V1alpha1() v1alpha1.Interface { + return v1alpha1.New(g.factory, g.namespace, g.tweakListOptions) +} diff --git a/pkg/client/informers/externalversions/flows/v1alpha1/flow.go b/pkg/client/informers/externalversions/flows/v1alpha1/flow.go new file mode 100644 index 00000000000..19a5b8cf9ce --- /dev/null +++ b/pkg/client/informers/externalversions/flows/v1alpha1/flow.go @@ -0,0 +1,89 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + time "time" + + flows_v1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1" + versioned "github.com/knative/eventing/pkg/client/clientset/versioned" + internalinterfaces "github.com/knative/eventing/pkg/client/informers/externalversions/internalinterfaces" + v1alpha1 "github.com/knative/eventing/pkg/client/listers/flows/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// FlowInformer provides access to a shared informer and lister for +// Flows. +type FlowInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.FlowLister +} + +type flowInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewFlowInformer constructs a new informer for Flow type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFlowInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredFlowInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredFlowInformer constructs a new informer for Flow type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredFlowInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.FlowsV1alpha1().Flows(namespace).List(options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.FlowsV1alpha1().Flows(namespace).Watch(options) + }, + }, + &flows_v1alpha1.Flow{}, + resyncPeriod, + indexers, + ) +} + +func (f *flowInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredFlowInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *flowInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&flows_v1alpha1.Flow{}, f.defaultInformer) +} + +func (f *flowInformer) Lister() v1alpha1.FlowLister { + return v1alpha1.NewFlowLister(f.Informer().GetIndexer()) +} diff --git a/pkg/client/informers/externalversions/flows/v1alpha1/interface.go b/pkg/client/informers/externalversions/flows/v1alpha1/interface.go new file mode 100644 index 00000000000..dceacfb7090 --- /dev/null +++ b/pkg/client/informers/externalversions/flows/v1alpha1/interface.go @@ -0,0 +1,45 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + internalinterfaces "github.com/knative/eventing/pkg/client/informers/externalversions/internalinterfaces" +) + +// Interface provides access to all the informers in this group version. +type Interface interface { + // Flows returns a FlowInformer. + Flows() FlowInformer +} + +type version struct { + factory internalinterfaces.SharedInformerFactory + namespace string + tweakListOptions internalinterfaces.TweakListOptionsFunc +} + +// New returns a new Interface. +func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface { + return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} +} + +// Flows returns a FlowInformer. +func (v *version) Flows() FlowInformer { + return &flowInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} diff --git a/pkg/client/informers/externalversions/generic.go b/pkg/client/informers/externalversions/generic.go index b47a853dc5c..acd2c6c90f2 100644 --- a/pkg/client/informers/externalversions/generic.go +++ b/pkg/client/informers/externalversions/generic.go @@ -23,6 +23,7 @@ import ( v1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" feeds_v1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" + flows_v1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1" schema "k8s.io/apimachinery/pkg/runtime/schema" cache "k8s.io/client-go/tools/cache" ) @@ -71,6 +72,10 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource case feeds_v1alpha1.SchemeGroupVersion.WithResource("feeds"): return &genericInformer{resource: resource.GroupResource(), informer: f.Feeds().V1alpha1().Feeds().Informer()}, nil + // Group=flows.knative.dev, Version=v1alpha1 + case flows_v1alpha1.SchemeGroupVersion.WithResource("flows"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Flows().V1alpha1().Flows().Informer()}, nil + } return nil, fmt.Errorf("no informer found for %v", resource) diff --git a/pkg/client/listers/flows/v1alpha1/expansion_generated.go b/pkg/client/listers/flows/v1alpha1/expansion_generated.go new file mode 100644 index 00000000000..cd115a41fa4 --- /dev/null +++ b/pkg/client/listers/flows/v1alpha1/expansion_generated.go @@ -0,0 +1,27 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +// FlowListerExpansion allows custom methods to be added to +// FlowLister. +type FlowListerExpansion interface{} + +// FlowNamespaceListerExpansion allows custom methods to be added to +// FlowNamespaceLister. +type FlowNamespaceListerExpansion interface{} diff --git a/pkg/client/listers/flows/v1alpha1/flow.go b/pkg/client/listers/flows/v1alpha1/flow.go new file mode 100644 index 00000000000..7358d8f0d5f --- /dev/null +++ b/pkg/client/listers/flows/v1alpha1/flow.go @@ -0,0 +1,94 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// FlowLister helps list Flows. +type FlowLister interface { + // List lists all Flows in the indexer. + List(selector labels.Selector) (ret []*v1alpha1.Flow, err error) + // Flows returns an object that can list and get Flows. + Flows(namespace string) FlowNamespaceLister + FlowListerExpansion +} + +// flowLister implements the FlowLister interface. +type flowLister struct { + indexer cache.Indexer +} + +// NewFlowLister returns a new FlowLister. +func NewFlowLister(indexer cache.Indexer) FlowLister { + return &flowLister{indexer: indexer} +} + +// List lists all Flows in the indexer. +func (s *flowLister) List(selector labels.Selector) (ret []*v1alpha1.Flow, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.Flow)) + }) + return ret, err +} + +// Flows returns an object that can list and get Flows. +func (s *flowLister) Flows(namespace string) FlowNamespaceLister { + return flowNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// FlowNamespaceLister helps list and get Flows. +type FlowNamespaceLister interface { + // List lists all Flows in the indexer for a given namespace. + List(selector labels.Selector) (ret []*v1alpha1.Flow, err error) + // Get retrieves the Flow from the indexer for a given namespace and name. + Get(name string) (*v1alpha1.Flow, error) + FlowNamespaceListerExpansion +} + +// flowNamespaceLister implements the FlowNamespaceLister +// interface. +type flowNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all Flows in the indexer for a given namespace. +func (s flowNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.Flow, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.Flow)) + }) + return ret, err +} + +// Get retrieves the Flow from the indexer for a given namespace and name. +func (s flowNamespaceLister) Get(name string) (*v1alpha1.Flow, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("flow"), name) + } + return obj.(*v1alpha1.Flow), nil +} diff --git a/pkg/controller/bus/controller.go b/pkg/controller/bus/controller.go index d4a4dd63be0..9fd8477a3cc 100644 --- a/pkg/controller/bus/controller.go +++ b/pkg/controller/bus/controller.go @@ -39,6 +39,7 @@ import ( appslisters "k8s.io/client-go/listers/apps/v1" corelisters "k8s.io/client-go/listers/core/v1" rbaclisters "k8s.io/client-go/listers/rbac/v1beta1" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" @@ -105,6 +106,7 @@ func NewController( kubeclientset kubernetes.Interface, busclientset clientset.Interface, servingclientset servingclientset.Interface, + restConfig *rest.Config, kubeInformerFactory kubeinformers.SharedInformerFactory, busInformerFactory informers.SharedInformerFactory, routeInformerFactory servinginformers.SharedInformerFactory) controller.Interface { diff --git a/pkg/controller/channel/controller.go b/pkg/controller/channel/controller.go index 77668ce982a..72289d5dc70 100644 --- a/pkg/controller/channel/controller.go +++ b/pkg/controller/channel/controller.go @@ -34,6 +34,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" @@ -104,6 +105,7 @@ func NewController( kubeclientset kubernetes.Interface, channelclientset clientset.Interface, servingclientset servingclientset.Interface, + restConfig *rest.Config, kubeInformerFactory kubeinformers.SharedInformerFactory, channelInformerFactory informers.SharedInformerFactory, servingInformerFactory servinginformers.SharedInformerFactory) controller.Interface { diff --git a/pkg/controller/clusterbus/controller.go b/pkg/controller/clusterbus/controller.go index 0784b7bbdf4..ff0e0740fe5 100644 --- a/pkg/controller/clusterbus/controller.go +++ b/pkg/controller/clusterbus/controller.go @@ -37,6 +37,7 @@ import ( typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" appslisters "k8s.io/client-go/listers/apps/v1" corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" @@ -103,6 +104,7 @@ func NewController( kubeclientset kubernetes.Interface, clusterbusclientset clientset.Interface, servingclientset servingclientset.Interface, + restConfig *rest.Config, kubeInformerFactory kubeinformers.SharedInformerFactory, clusterBusInformerFactory informers.SharedInformerFactory, routeInformerFactory servinginformers.SharedInformerFactory) controller.Interface { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 17e59a9ae14..efdc0dfb61d 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -19,6 +19,7 @@ package controller import ( kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" servingclientset "github.com/knative/serving/pkg/client/clientset/versioned" servinginformers "github.com/knative/serving/pkg/client/informers/externalversions" @@ -35,6 +36,7 @@ type Constructor func( kubernetes.Interface, clientset.Interface, servingclientset.Interface, + *rest.Config, kubeinformers.SharedInformerFactory, informers.SharedInformerFactory, servinginformers.SharedInformerFactory, diff --git a/pkg/controller/feed/controller.go b/pkg/controller/feed/controller.go index bf13ab20185..ffc0a1e41b0 100644 --- a/pkg/controller/feed/controller.go +++ b/pkg/controller/feed/controller.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" @@ -111,6 +112,7 @@ func NewController( kubeclientset kubernetes.Interface, feedsclientset clientset.Interface, servingclientset servingclientset.Interface, + restConfig *rest.Config, kubeInformerFactory kubeinformers.SharedInformerFactory, feedsInformerFactory informers.SharedInformerFactory, routeInformerFactory servinginformers.SharedInformerFactory) controller.Interface { diff --git a/pkg/controller/flow/dyn_client.go b/pkg/controller/flow/dyn_client.go new file mode 100644 index 00000000000..b78eac8e884 --- /dev/null +++ b/pkg/controller/flow/dyn_client.go @@ -0,0 +1,78 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flow + +import ( + "fmt" + "strings" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" +) + +// CreateDynamicClient creates a dynamic client for the Group Version for the +// ObjectReference. It can only be used for that APIVersion / Group +func CreateDynamicClient(config *rest.Config, ref *corev1.ObjectReference) (*dynamic.Client, error) { + // We need to tweak the configuration so that it points to the right + // resources under the ThirdPartyResources that Istio uses. + gvk := ref.GroupVersionKind() + + config.ContentConfig.GroupVersion = &schema.GroupVersion{ + Group: gvk.Group, + Version: gvk.Version, + } + config.APIPath = "apis" + return dynamic.NewClient(config) +} + +func CreateResourceInterface(config *rest.Config, ref *corev1.ObjectReference, namespace string) (dynamic.ResourceInterface, error) { + c, err := CreateDynamicClient(config, ref) + if err != nil { + return nil, err + } + + gvk := ref.GroupVersionKind() + kind := gvk.Kind + name := pluralizeKind(kind) + + resource := metav1.APIResource{ + Name: name, + Kind: gvk.Kind, + Namespaced: true, + } + r := c.Resource(&resource, namespace) + if r == nil { + return nil, fmt.Errorf("failed to create dynamic client resource") + } + return r, nil +} + +// takes a kind and pluralizes it. This is super terrible, but I am +// not aware of a generic way to do this. +// I am not alone in thinking this and I haven't found a better solution: +// This seems relevant: +// https://github.com/kubernetes/kubernetes/issues/18622 +func pluralizeKind(kind string) string { + ret := strings.ToLower(kind) + if strings.HasSuffix(ret, "s") { + return fmt.Sprintf("%ses", ret) + } + return fmt.Sprintf("%ss", ret) +} diff --git a/pkg/controller/flow/flow.go b/pkg/controller/flow/flow.go new file mode 100644 index 00000000000..1a8b592c2b3 --- /dev/null +++ b/pkg/controller/flow/flow.go @@ -0,0 +1,576 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package flow + +import ( + "fmt" + "log" + "time" + + "github.com/golang/glog" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtimetypes "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + + // TODO: Get rid of these, but needed as other controllers use them. + servingclientset "github.com/knative/serving/pkg/client/clientset/versioned" + servinginformers "github.com/knative/serving/pkg/client/informers/externalversions" + + "github.com/knative/eventing/pkg/controller" + + channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" + v1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1" + + clientset "github.com/knative/eventing/pkg/client/clientset/versioned" + flowscheme "github.com/knative/eventing/pkg/client/clientset/versioned/scheme" + informers "github.com/knative/eventing/pkg/client/informers/externalversions" + channelListers "github.com/knative/eventing/pkg/client/listers/channels/v1alpha1" + feedListers "github.com/knative/eventing/pkg/client/listers/feeds/v1alpha1" + listers "github.com/knative/eventing/pkg/client/listers/flows/v1alpha1" +) + +const controllerAgentName = "flow-controller" + +// TODO: This should come from a configmap +const defaultBusName = "stub" + +const ( + // SuccessSynced is used as part of the Event 'reason' when a Flow is synced + SuccessSynced = "Synced" + + // MessageResourceSynced is the message used for an Event fired when a Flow + // is synced successfully + MessageResourceSynced = "Flow synced successfully" +) + +var ( + flowControllerKind = v1alpha1.SchemeGroupVersion.WithKind("Flow") +) + +// Controller is the controller implementation for Flow resources +type Controller struct { + // kubeclientset is a standard kubernetes clientset + kubeclientset kubernetes.Interface + + // restConfig is used to create dynamic clients for + // resolving ObjectReference targets. + restConfig *rest.Config + + // clientset is a clientset for our own API group + clientset clientset.Interface + + flowsLister listers.FlowLister + flowsSynced cache.InformerSynced + + feedsLister feedListers.FeedLister + feedsSynced cache.InformerSynced + + channelsLister channelListers.ChannelLister + channelsSynced cache.InformerSynced + + subscriptionsLister channelListers.SubscriptionLister + subscriptionsSynced cache.InformerSynced + + // workqueue is a rate limited work queue. This is used to queue work to be + // processed instead of performing it as soon as a change happens. This + // means we can ensure we only process a fixed amount of resources at a + // time, and makes it easy to ensure we are never processing the same item + // simultaneously in two different workers. + workqueue workqueue.RateLimitingInterface + // recorder is an event recorder for recording Event resources to the + // Kubernetes API. + recorder record.EventRecorder +} + +// NewController returns a new flow controller +func NewController( + kubeclientset kubernetes.Interface, + clientset clientset.Interface, + servingclientset servingclientset.Interface, + restConfig *rest.Config, + kubeInformerFactory kubeinformers.SharedInformerFactory, + flowsInformerFactory informers.SharedInformerFactory, + routeInformerFactory servinginformers.SharedInformerFactory) controller.Interface { + + // obtain a reference to a shared index informer for the Flow types. + flowInformer := flowsInformerFactory.Flows().V1alpha1() + + // obtain a reference to a shared index informer for the Feed types. + feedInformer := flowsInformerFactory.Feeds().V1alpha1() + + channelInformer := flowsInformerFactory.Channels().V1alpha1().Channels() + + subscriptionInformer := flowsInformerFactory.Channels().V1alpha1().Subscriptions() + + // Create event broadcaster + // Add flow-controller types to the default Kubernetes Scheme so Events can be + // logged for flow-controller types. + flowscheme.AddToScheme(scheme.Scheme) + glog.V(4).Info("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + + controller := &Controller{ + kubeclientset: kubeclientset, + restConfig: restConfig, + clientset: clientset, + flowsLister: flowInformer.Flows().Lister(), + flowsSynced: flowInformer.Flows().Informer().HasSynced, + feedsLister: feedInformer.Feeds().Lister(), + feedsSynced: feedInformer.Feeds().Informer().HasSynced, + channelsLister: channelInformer.Lister(), + channelsSynced: channelInformer.Informer().HasSynced, + subscriptionsLister: subscriptionInformer.Lister(), + subscriptionsSynced: subscriptionInformer.Informer().HasSynced, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Flows"), + recorder: recorder, + } + + glog.Info("Setting up event handlers") + + // Set up an event handler for when Flow resources change + flowInformer.Flows().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: controller.enqueueFlow, + UpdateFunc: func(old, new interface{}) { + controller.enqueueFlow(new) + }, + }) + + return controller +} + +// Run will set up the event handlers for types we are interested in, as well +// as syncing informer caches and starting workers. It will block until stopCh +// is closed, at which point it will shutdown the workqueue and wait for +// workers to finish processing their current work items. +func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { + defer runtime.HandleCrash() + defer c.workqueue.ShutDown() + + // Start the informer factories to begin populating the informer caches + glog.Info("Starting Flow controller") + + // Wait for the caches to be synced before starting workers + glog.Info("Waiting for Flow informer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, c.flowsSynced); !ok { + return fmt.Errorf("failed to wait for Flow caches to sync") + } + + glog.Info("Waiting for Feed informer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, c.feedsSynced); !ok { + return fmt.Errorf("failed to wait for Feed caches to sync") + } + + glog.Info("Waiting for channel informer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, c.channelsSynced); !ok { + return fmt.Errorf("failed to wait for Channel caches to sync") + } + + glog.Info("Starting workers") + // Launch two workers to process Flow resources + for i := 0; i < threadiness; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + glog.Info("Started workers") + <-stopCh + glog.Info("Shutting down workers") + + return nil +} + +// runWorker is a long-running function that will continually call the +// processNextWorkItem function in order to read and process a message on the +// workqueue. +func (c *Controller) runWorker() { + for c.processNextWorkItem() { + } +} + +// processNextWorkItem will read a single work item off the workqueue and +// attempt to process it, by calling Reconcile. +func (c *Controller) processNextWorkItem() bool { + obj, shutdown := c.workqueue.Get() + + if shutdown { + return false + } + + // We wrap this block in a func so we can defer c.workqueue.Done. + if err := func(obj interface{}) error { + // We call Done here so the workqueue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the workqueue and attempted again after a back-off + // period. + defer c.workqueue.Done(obj) + // We expect strings to come off the workqueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // workqueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // workqueue. + key, ok := obj.(string) + if !ok { + // As the item in the workqueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + c.workqueue.Forget(obj) + runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + // Run the Reconcile, passing it the namespace/name string of the + // Flow resource to be synced. + if err := c.Reconcile(key); err != nil { + return fmt.Errorf("error syncing '%s': %s", key, err.Error()) + } + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + c.workqueue.Forget(obj) + glog.Infof("Successfully synced '%s'", key) + return nil + }(obj); err != nil { + runtime.HandleError(err) + } + + return true +} + +// enqueueFlow takes a Flow resource and converts it into a namespace/name +// string which is then put onto the work queue. This method should *not* be +// passed resources of any type other than Flow. +func (c *Controller) enqueueFlow(obj interface{}) { + var key string + var err error + if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { + runtime.HandleError(err) + return + } + c.workqueue.AddRateLimited(key) +} + +// Reconcile compares the actual state with the desired, and attempts to +// converge the two. It then updates the Status block of the Flow resource +// with the current status of the resource. +func (c *Controller) Reconcile(key string) error { + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil + } + + // Get the Flow resource with this namespace/name + original, err := c.flowsLister.Flows(namespace).Get(name) + if err != nil { + // The Flow resource may no longer exist, in which case we stop + // processing. + if errors.IsNotFound(err) { + runtime.HandleError(fmt.Errorf("flow '%s' in work queue no longer exists", key)) + return nil + } + return err + } + + // Don't mutate the informer's copy of our object. + flow := original.DeepCopy() + + // Reconcile this copy of the Flow and then write back any status + // updates regardless of whether the reconcile error out. + err = c.reconcile(flow) + if equality.Semantic.DeepEqual(original.Status, flow.Status) { + // If we didn't change anything then don't call updateStatus. + // This is important because the copy we loaded from the informer's + // cache may be stale and we don't want to overwrite a prior update + // to status with this stale state. + } else if _, err := c.updateStatus(flow); err != nil { + glog.Warningf("Failed to update flow status: %v", err) + return err + } + return err +} + +func (c *Controller) reconcile(flow *v1alpha1.Flow) error { + // See if the flow has been deleted + accessor, err := meta.Accessor(flow) + if err != nil { + log.Fatalf("Failed to get metadata: %s", err) + } + deletionTimestamp := accessor.GetDeletionTimestamp() + glog.Infof("DeletionTimestamp: %v", deletionTimestamp) + + target, err := c.resolveActionTarget(flow.Namespace, flow.Spec.Action) + if err != nil { + glog.Warningf("Failed to resolve target %v : %v", flow.Spec.Action, err) + return err + } + + // Ok, so target is the underlying k8s service (or URI if so specified) that we want to target + glog.Infof("Resolved Target to: %q", target) + + // Reconcile the Channel. Creates a channel that is the target that the Feed will use. + // TODO: We should reuse channels possibly. + channel, err := c.reconcileChannel(flow) + if err != nil { + glog.Warningf("Failed to reconcile channel : %v", err) + return err + } + + glog.Infof("Created Channel %q", channel.Name) + + subscription, err := c.reconcileSubscription(channel.Name, target, flow) + if err != nil { + glog.Warningf("Failed to reconcile subscription : %v", err) + return err + } + + glog.Infof("Created Subscription %q", subscription.Name) + + feed, err := c.reconcileFeed(channel.Name, flow) + if err != nil { + glog.Warningf("Failed to reconcile feed: %v", err) + } + + glog.Infof("Created Feed %q", feed.Name) + return nil +} + +func (c *Controller) updateStatus(u *v1alpha1.Flow) (*v1alpha1.Flow, error) { + flowClient := c.clientset.FlowsV1alpha1().Flows(u.Namespace) + newu, err := flowClient.Get(u.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + newu.Status = u.Status + + // Until #38113 is merged, we must use Update instead of UpdateStatus to + // update the Status block of the Flow resource. UpdateStatus will not + // allow changes to the Spec of the resource, which is ideal for ensuring + // nothing other than resource status has been updated. + return flowClient.Update(newu) +} + +// AddFinalizer adds value to the list of finalizers on obj +func AddFinalizer(obj runtimetypes.Object, value string) error { + accessor, err := meta.Accessor(obj) + if err != nil { + return err + } + finalizers := sets.NewString(accessor.GetFinalizers()...) + finalizers.Insert(value) + accessor.SetFinalizers(finalizers.List()) + return nil +} + +// syncHandler compares the actual state with the desired, and attempts to +// converge the two. It then updates the Status block of the Flow resource +// with the current status of the resource. +func (c *Controller) resolveActionTarget(namespace string, action v1alpha1.FlowAction) (string, error) { + glog.Infof("Resolving target: %v", action) + + if action.Target != nil { + return c.resolveObjectReference(namespace, action.Target) + } + if action.TargetURI != nil { + return *action.TargetURI, nil + } + + return "", fmt.Errorf("No resolvable action target: %+v", action) +} + +// resolveObjectReference fetches an object based on ObjectRefence. It assumes the +// object has a status["serviceName"] string in it and returns it. +func (c *Controller) resolveObjectReference(namespace string, ref *corev1.ObjectReference) (string, error) { + resourceClient, err := CreateResourceInterface(c.restConfig, ref, namespace) + if err != nil { + glog.Warningf("failed to create dynamic client resource: %v", err) + return "", err + } + + obj, err := resourceClient.Get(ref.Name, metav1.GetOptions{}) + if err != nil { + glog.Warningf("failed to get object: %v", err) + return "", err + } + status, ok := obj.Object["status"] + if !ok { + return "", fmt.Errorf("%q does not contain status", ref.Name) + } + statusMap := status.(map[string]interface{}) + serviceName, ok := statusMap["serviceName"] + if !ok { + return "", fmt.Errorf("%q does not contain serviceName in status", ref.Name) + } + serviceNameStr, ok := serviceName.(string) + if !ok { + return "", fmt.Errorf("%q status serviceName is not a string", ref.Name) + } + return serviceNameStr, nil +} + +func (c *Controller) reconcileChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Channel, error) { + channelName := flow.Name + + channel, err := c.channelsLister.Channels(flow.Namespace).Get(channelName) + if errors.IsNotFound(err) { + channel, err = c.createChannel(flow) + if err != nil { + glog.Errorf("Failed to create channel %q : %v", channelName, err) + return nil, err + } + } else if err != nil { + glog.Errorf("Failed to reconcile channel %q failed to get channels : %v", channelName, err) + return nil, err + } + + // Should make sure channel is what it should be. For now, just assume it's fine + // if it exists. + return channel, err +} + +func (c *Controller) createChannel(flow *v1alpha1.Flow) (*channelsv1alpha1.Channel, error) { + channelName := flow.Name + channel := &channelsv1alpha1.Channel{ + ObjectMeta: metav1.ObjectMeta{ + Name: channelName, + Namespace: flow.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *c.NewControllerRef(flow), + }, + }, + Spec: channelsv1alpha1.ChannelSpec{ + ClusterBus: defaultBusName, + }, + } + return c.clientset.ChannelsV1alpha1().Channels(flow.Namespace).Create(channel) +} + +func (c *Controller) reconcileSubscription(channelName string, target string, flow *v1alpha1.Flow) (*channelsv1alpha1.Subscription, error) { + subscriptionName := flow.Name + subscription, err := c.subscriptionsLister.Subscriptions(flow.Namespace).Get(subscriptionName) + if errors.IsNotFound(err) { + subscription, err = c.createSubscription(channelName, target, flow) + if err != nil { + glog.Errorf("Failed to create subscription %q : %v", subscriptionName, err) + return nil, err + } + } else if err != nil { + glog.Errorf("Failed to reconcile subscription %q failed to get subscriptions : %v", subscriptionName, err) + return nil, err + } + + // Should make sure subscription is what it should be. For now, just assume it's fine + // if it exists. + return subscription, err +} + +func (c *Controller) createSubscription(channelName string, target string, flow *v1alpha1.Flow) (*channelsv1alpha1.Subscription, error) { + subscriptionName := flow.Name + subscription := &channelsv1alpha1.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: subscriptionName, + Namespace: flow.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *c.NewControllerRef(flow), + }, + }, + Spec: channelsv1alpha1.SubscriptionSpec{ + Channel: channelName, + Subscriber: target, + }, + } + return c.clientset.ChannelsV1alpha1().Subscriptions(flow.Namespace).Create(subscription) +} + +func (c *Controller) reconcileFeed(channelName string, flow *v1alpha1.Flow) (*feedsv1alpha1.Feed, error) { + feedName := flow.Name + feed, err := c.feedsLister.Feeds(flow.Namespace).Get(feedName) + if errors.IsNotFound(err) { + feed, err = c.createFeed(feedName, flow) + if err != nil { + glog.Errorf("Failed to create feed %q : %v", feedName, err) + return nil, err + } + } else if err != nil { + glog.Errorf("Failed to reconcile feed %q failed to get feeds : %v", feedName, err) + return nil, err + } + + // Should make sure feed is what it should be. For now, just assume it's fine + // if it exists. + return feed, err + +} + +func (c *Controller) createFeed(channelName string, flow *v1alpha1.Flow) (*feedsv1alpha1.Feed, error) { + feedName := flow.Name + feed := &feedsv1alpha1.Feed{ + ObjectMeta: metav1.ObjectMeta{ + Name: feedName, + Namespace: flow.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *c.NewControllerRef(flow), + }, + }, + Spec: feedsv1alpha1.FeedSpec{ + Action: feedsv1alpha1.FeedAction{ChannelName: channelName}, + Trigger: feedsv1alpha1.EventTrigger{ + EventType: flow.Spec.Trigger.EventType, + Resource: flow.Spec.Trigger.Resource, + Service: flow.Spec.Trigger.Service, + }, + }, + } + if flow.Spec.ServiceAccountName != "" { + feed.Spec.ServiceAccountName = flow.Spec.ServiceAccountName + } + + if flow.Spec.Trigger.Parameters != nil { + feed.Spec.Trigger.Parameters = flow.Spec.Trigger.Parameters + } + if flow.Spec.Trigger.ParametersFrom != nil { + feed.Spec.Trigger.ParametersFrom = flow.Spec.Trigger.ParametersFrom + } + + return c.clientset.FeedsV1alpha1().Feeds(flow.Namespace).Create(feed) +} + +func (c *Controller) NewControllerRef(flow *v1alpha1.Flow) *metav1.OwnerReference { + blockOwnerDeletion := false + isController := false + revRef := metav1.NewControllerRef(flow, flowControllerKind) + revRef.BlockOwnerDeletion = &blockOwnerDeletion + revRef.Controller = &isController + return revRef +} diff --git a/sample/README.md b/sample/README.md index 06336926ee7..1ddf79241a4 100644 --- a/sample/README.md +++ b/sample/README.md @@ -21,3 +21,4 @@ functionality. * [Github Pull Request Handler](./github) - A simple handler for Github Pull Requests * [GCP PubSub Receiver Handler](./gcp_pubsub_function) - A simple handler for processing GCP PubSub events * [K8S events Handler](./k8s_events_function) - A simple handler for processing k8s events in the cluster +* [K8S events Handler using Flow](./flow) - Example using Flow object that wires k8s events through the bus diff --git a/sample/flow/README.md b/sample/flow/README.md new file mode 100644 index 00000000000..a63e5da9529 --- /dev/null +++ b/sample/flow/README.md @@ -0,0 +1,57 @@ +# Flow Example + +The flow sample includes a function that listens for k8s events on a cluster and a flow +object that wires it into the function + +## Deploy + +First, install the Stub bus, if not already installed. You **must** change the Kind to ClusterBus + +``` +ko apply -f config/buses/stub/stub-bus.yaml +``` + +## Install k8s events as an event source +```shell +ko apply -f pkg/sources/k8sevents/ +``` + +## Creating a Service Account +Because the Receive Adapter needs to run a deployment, you need to specify what +Service Account should be used in the target namespace for running the Receive Adapter. +Feed.Spec has a field that allows you to specify this. By default it uses "default" for +feed which typically has no privileges, but this feed requires standing up a +deployment, so you need to either use an existing Service Account with appropriate +priviledges or create a new one. This example creates a Service Account and grants +it cluster admin access, and you probably wouldn't want to do that in production +settings, but for this example it will suffice just fine. + +```shell +ko apply -f sample/k8s_events_function/serviceaccount.yaml +ko apply -f sample/k8s_events_function/serviceaccountbinding.yaml +``` + +## Deploy function +Then, deploy the k8s-events function + +```shell +ko apply -f sample/k8s_events_function/function.yaml +``` + +This will create a Route and Configuration, so make sure it's up + +``` +$ kubectl get pods +NAME READY STATUS RESTARTS AGE +k8s-events-00001-deployment-78fb756c8b-rfwl4 3/3 Running 0 40m +``` + +## Create a flow wiring this into k8s events +```shell +ko apply -f sample/flow/flow.yaml +``` + +## Check the logs of the function +```shell +kubectl logs k8s-events-00001-deployment-78fb756c8b-rfwl4 user-container +``` diff --git a/sample/flow/flow.yaml b/sample/flow/flow.yaml new file mode 100644 index 00000000000..f820a05bad3 --- /dev/null +++ b/sample/flow/flow.yaml @@ -0,0 +1,38 @@ +# Copyright 2018 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: flows.knative.dev/v1alpha1 +kind: Flow +metadata: + name: flow-example + namespace: default +spec: + serviceAccountName: feed-sa + trigger: + eventType: receiveevent + # This is kind of superfluous due to parameters, but the current + # feed model still requires a resource + resource: k8sevents/receiveevent + service: k8sevents + parameters: + namespace: default + action: + target: +# Once Route supports serviceName, use that +# kind: Route + kind: Revision + apiVersion: serving.knative.dev/v1alpha1 +# Once Route supports serviceName, use that +# name: k8s-events + name: k8s-events-00001 diff --git a/sample/hello/README.md b/sample/hello/README.md index c7988f13e68..501538886dc 100644 --- a/sample/hello/README.md +++ b/sample/hello/README.md @@ -5,7 +5,7 @@ The hello sample includes a Knative Service, Channel and Subscription. First, install the Stub bus, if not already installed: ``` -ko apply -f config/buses/stub.yaml +ko apply -f config/buses/stub/stub-bus.yaml ``` Then, deploy the hello function, channel and subscription: diff --git a/sample/k8s_events_function/function.yaml b/sample/k8s_events_function/function.yaml new file mode 100644 index 00000000000..306364db4b0 --- /dev/null +++ b/sample/k8s_events_function/function.yaml @@ -0,0 +1,38 @@ +# Copyright 2018 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: serving.knative.dev/v1alpha1 +kind: Configuration +metadata: + name: k8s-events + namespace: default +spec: + revisionTemplate: + metadata: + labels: + knative.dev/type: container + spec: + container: + image: github.com/knative/eventing/sample/k8s_events_function + +--- +apiVersion: serving.knative.dev/v1alpha1 +kind: Route +metadata: + name: k8s-events + namespace: default +spec: + traffic: + - configurationName: k8s-events + percent: 100 diff --git a/vendor/k8s.io/apimachinery/pkg/api/equality/semantic.go b/vendor/k8s.io/apimachinery/pkg/api/equality/semantic.go new file mode 100644 index 00000000000..f02fa8e4340 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/api/equality/semantic.go @@ -0,0 +1,49 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package equality + +import ( + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" +) + +// Semantic can do semantic deep equality checks for api objects. +// Example: apiequality.Semantic.DeepEqual(aPod, aPodWithNonNilButEmptyMaps) == true +var Semantic = conversion.EqualitiesOrDie( + func(a, b resource.Quantity) bool { + // Ignore formatting, only care that numeric value stayed the same. + // TODO: if we decide it's important, it should be safe to start comparing the format. + // + // Uninitialized quantities are equivalent to 0 quantities. + return a.Cmp(b) == 0 + }, + func(a, b metav1.MicroTime) bool { + return a.UTC() == b.UTC() + }, + func(a, b metav1.Time) bool { + return a.UTC() == b.UTC() + }, + func(a, b labels.Selector) bool { + return a.String() == b.String() + }, + func(a, b fields.Selector) bool { + return a.String() == b.String() + }, +) diff --git a/vendor/k8s.io/client-go/dynamic/client.go b/vendor/k8s.io/client-go/dynamic/client.go new file mode 100644 index 00000000000..833e43537b0 --- /dev/null +++ b/vendor/k8s.io/client-go/dynamic/client.go @@ -0,0 +1,379 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package dynamic provides a client interface to arbitrary Kubernetes +// APIs that exposes common high level operations and exposes common +// metadata. +package dynamic + +import ( + "encoding/json" + "errors" + "io" + "net/url" + "strings" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/conversion/queryparams" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/scheme" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/util/flowcontrol" +) + +// Interface is a Kubernetes client that allows you to access metadata +// and manipulate metadata of a Kubernetes API group. +type Interface interface { + // GetRateLimiter returns the rate limiter for this client. + GetRateLimiter() flowcontrol.RateLimiter + // Resource returns an API interface to the specified resource for this client's + // group and version. If resource is not a namespaced resource, then namespace + // is ignored. The ResourceInterface inherits the parameter codec of this client. + Resource(resource *metav1.APIResource, namespace string) ResourceInterface + // ParameterCodec returns a client with the provided parameter codec. + ParameterCodec(parameterCodec runtime.ParameterCodec) Interface +} + +// ResourceInterface is an API interface to a specific resource under a +// dynamic client. +type ResourceInterface interface { + // List returns a list of objects for this resource. + List(opts metav1.ListOptions) (runtime.Object, error) + // Get gets the resource with the specified name. + Get(name string, opts metav1.GetOptions) (*unstructured.Unstructured, error) + // Delete deletes the resource with the specified name. + Delete(name string, opts *metav1.DeleteOptions) error + // DeleteCollection deletes a collection of objects. + DeleteCollection(deleteOptions *metav1.DeleteOptions, listOptions metav1.ListOptions) error + // Create creates the provided resource. + Create(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) + // Update updates the provided resource. + Update(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) + // Watch returns a watch.Interface that watches the resource. + Watch(opts metav1.ListOptions) (watch.Interface, error) + // Patch patches the provided resource. + Patch(name string, pt types.PatchType, data []byte) (*unstructured.Unstructured, error) +} + +// Client is a Kubernetes client that allows you to access metadata +// and manipulate metadata of a Kubernetes API group, and implements Interface. +type Client struct { + cl *restclient.RESTClient + parameterCodec runtime.ParameterCodec +} + +// NewClient returns a new client based on the passed in config. The +// codec is ignored, as the dynamic client uses it's own codec. +func NewClient(conf *restclient.Config) (*Client, error) { + // avoid changing the original config + confCopy := *conf + conf = &confCopy + + contentConfig := ContentConfig() + contentConfig.GroupVersion = conf.GroupVersion + if conf.NegotiatedSerializer != nil { + contentConfig.NegotiatedSerializer = conf.NegotiatedSerializer + } + conf.ContentConfig = contentConfig + + if conf.APIPath == "" { + conf.APIPath = "/api" + } + + if len(conf.UserAgent) == 0 { + conf.UserAgent = restclient.DefaultKubernetesUserAgent() + } + + cl, err := restclient.RESTClientFor(conf) + if err != nil { + return nil, err + } + + return &Client{cl: cl}, nil +} + +// GetRateLimiter returns rate limier. +func (c *Client) GetRateLimiter() flowcontrol.RateLimiter { + return c.cl.GetRateLimiter() +} + +// Resource returns an API interface to the specified resource for this client's +// group and version. If resource is not a namespaced resource, then namespace +// is ignored. The ResourceInterface inherits the parameter codec of c. +func (c *Client) Resource(resource *metav1.APIResource, namespace string) ResourceInterface { + return &ResourceClient{ + cl: c.cl, + resource: resource, + ns: namespace, + parameterCodec: c.parameterCodec, + } +} + +// ParameterCodec returns a client with the provided parameter codec. +func (c *Client) ParameterCodec(parameterCodec runtime.ParameterCodec) Interface { + return &Client{ + cl: c.cl, + parameterCodec: parameterCodec, + } +} + +// ResourceClient is an API interface to a specific resource under a +// dynamic client, and implements ResourceInterface. +type ResourceClient struct { + cl *restclient.RESTClient + resource *metav1.APIResource + ns string + parameterCodec runtime.ParameterCodec +} + +func (rc *ResourceClient) parseResourceSubresourceName() (string, []string) { + var resourceName string + var subresourceName []string + if strings.Contains(rc.resource.Name, "/") { + resourceName = strings.Split(rc.resource.Name, "/")[0] + subresourceName = strings.Split(rc.resource.Name, "/")[1:] + } else { + resourceName = rc.resource.Name + } + + return resourceName, subresourceName +} + +// List returns a list of objects for this resource. +func (rc *ResourceClient) List(opts metav1.ListOptions) (runtime.Object, error) { + parameterEncoder := rc.parameterCodec + if parameterEncoder == nil { + parameterEncoder = defaultParameterEncoder + } + return rc.cl.Get(). + NamespaceIfScoped(rc.ns, rc.resource.Namespaced). + Resource(rc.resource.Name). + VersionedParams(&opts, parameterEncoder). + Do(). + Get() +} + +// Get gets the resource with the specified name. +func (rc *ResourceClient) Get(name string, opts metav1.GetOptions) (*unstructured.Unstructured, error) { + parameterEncoder := rc.parameterCodec + if parameterEncoder == nil { + parameterEncoder = defaultParameterEncoder + } + result := new(unstructured.Unstructured) + resourceName, subresourceName := rc.parseResourceSubresourceName() + err := rc.cl.Get(). + NamespaceIfScoped(rc.ns, rc.resource.Namespaced). + Resource(resourceName). + SubResource(subresourceName...). + VersionedParams(&opts, parameterEncoder). + Name(name). + Do(). + Into(result) + return result, err +} + +// Delete deletes the resource with the specified name. +func (rc *ResourceClient) Delete(name string, opts *metav1.DeleteOptions) error { + return rc.cl.Delete(). + NamespaceIfScoped(rc.ns, rc.resource.Namespaced). + Resource(rc.resource.Name). + Name(name). + Body(opts). + Do(). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (rc *ResourceClient) DeleteCollection(deleteOptions *metav1.DeleteOptions, listOptions metav1.ListOptions) error { + parameterEncoder := rc.parameterCodec + if parameterEncoder == nil { + parameterEncoder = defaultParameterEncoder + } + return rc.cl.Delete(). + NamespaceIfScoped(rc.ns, rc.resource.Namespaced). + Resource(rc.resource.Name). + VersionedParams(&listOptions, parameterEncoder). + Body(deleteOptions). + Do(). + Error() +} + +// Create creates the provided resource. +func (rc *ResourceClient) Create(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + result := new(unstructured.Unstructured) + resourceName, subresourceName := rc.parseResourceSubresourceName() + req := rc.cl.Post(). + NamespaceIfScoped(rc.ns, rc.resource.Namespaced). + Resource(resourceName). + Body(obj) + if len(subresourceName) > 0 { + // If the provided resource is a subresource, the POST request should contain + // object name. Examples of subresources that support Create operation: + // core/v1/pods/{name}/binding + // core/v1/pods/{name}/eviction + // extensions/v1beta1/deployments/{name}/rollback + // apps/v1beta1/deployments/{name}/rollback + // NOTE: Currently our system assumes every subresource object has the same + // name as the parent resource object. E.g. a pods/binding object having + // metadada.name "foo" means pod "foo" is being bound. We may need to + // change this if we break the assumption in the future. + req = req.SubResource(subresourceName...). + Name(obj.GetName()) + } + err := req.Do(). + Into(result) + return result, err +} + +// Update updates the provided resource. +func (rc *ResourceClient) Update(obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + result := new(unstructured.Unstructured) + if len(obj.GetName()) == 0 { + return result, errors.New("object missing name") + } + resourceName, subresourceName := rc.parseResourceSubresourceName() + err := rc.cl.Put(). + NamespaceIfScoped(rc.ns, rc.resource.Namespaced). + Resource(resourceName). + SubResource(subresourceName...). + // NOTE: Currently our system assumes every subresource object has the same + // name as the parent resource object. E.g. a pods/binding object having + // metadada.name "foo" means pod "foo" is being bound. We may need to + // change this if we break the assumption in the future. + Name(obj.GetName()). + Body(obj). + Do(). + Into(result) + return result, err +} + +// Watch returns a watch.Interface that watches the resource. +func (rc *ResourceClient) Watch(opts metav1.ListOptions) (watch.Interface, error) { + parameterEncoder := rc.parameterCodec + if parameterEncoder == nil { + parameterEncoder = defaultParameterEncoder + } + opts.Watch = true + return rc.cl.Get(). + NamespaceIfScoped(rc.ns, rc.resource.Namespaced). + Resource(rc.resource.Name). + VersionedParams(&opts, parameterEncoder). + Watch() +} + +// Patch applies the patch and returns the patched resource. +func (rc *ResourceClient) Patch(name string, pt types.PatchType, data []byte) (*unstructured.Unstructured, error) { + result := new(unstructured.Unstructured) + resourceName, subresourceName := rc.parseResourceSubresourceName() + err := rc.cl.Patch(pt). + NamespaceIfScoped(rc.ns, rc.resource.Namespaced). + Resource(resourceName). + SubResource(subresourceName...). + Name(name). + Body(data). + Do(). + Into(result) + return result, err +} + +// dynamicCodec is a codec that wraps the standard unstructured codec +// with special handling for Status objects. +type dynamicCodec struct{} + +func (dynamicCodec) Decode(data []byte, gvk *schema.GroupVersionKind, obj runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) { + obj, gvk, err := unstructured.UnstructuredJSONScheme.Decode(data, gvk, obj) + if err != nil { + return nil, nil, err + } + + if _, ok := obj.(*metav1.Status); !ok && strings.ToLower(gvk.Kind) == "status" { + obj = &metav1.Status{} + err := json.Unmarshal(data, obj) + if err != nil { + return nil, nil, err + } + } + + return obj, gvk, nil +} + +func (dynamicCodec) Encode(obj runtime.Object, w io.Writer) error { + return unstructured.UnstructuredJSONScheme.Encode(obj, w) +} + +// ContentConfig returns a restclient.ContentConfig for dynamic types. +func ContentConfig() restclient.ContentConfig { + var jsonInfo runtime.SerializerInfo + // TODO: scheme.Codecs here should become "pkg/apis/server/scheme" which is the minimal core you need + // to talk to a kubernetes server + for _, info := range scheme.Codecs.SupportedMediaTypes() { + if info.MediaType == runtime.ContentTypeJSON { + jsonInfo = info + break + } + } + + jsonInfo.Serializer = dynamicCodec{} + jsonInfo.PrettySerializer = nil + return restclient.ContentConfig{ + AcceptContentTypes: runtime.ContentTypeJSON, + ContentType: runtime.ContentTypeJSON, + NegotiatedSerializer: serializer.NegotiatedSerializerWrapper(jsonInfo), + } +} + +// paramaterCodec is a codec converts an API object to query +// parameters without trying to convert to the target version. +type parameterCodec struct{} + +func (parameterCodec) EncodeParameters(obj runtime.Object, to schema.GroupVersion) (url.Values, error) { + return queryparams.Convert(obj) +} + +func (parameterCodec) DecodeParameters(parameters url.Values, from schema.GroupVersion, into runtime.Object) error { + return errors.New("DecodeParameters not implemented on dynamic parameterCodec") +} + +var defaultParameterEncoder runtime.ParameterCodec = parameterCodec{} + +type versionedParameterEncoderWithV1Fallback struct{} + +func (versionedParameterEncoderWithV1Fallback) EncodeParameters(obj runtime.Object, to schema.GroupVersion) (url.Values, error) { + ret, err := scheme.ParameterCodec.EncodeParameters(obj, to) + if err != nil && runtime.IsNotRegisteredError(err) { + // fallback to v1 + return scheme.ParameterCodec.EncodeParameters(obj, v1.SchemeGroupVersion) + } + return ret, err +} + +func (versionedParameterEncoderWithV1Fallback) DecodeParameters(parameters url.Values, from schema.GroupVersion, into runtime.Object) error { + return errors.New("DecodeParameters not implemented on versionedParameterEncoderWithV1Fallback") +} + +// VersionedParameterEncoderWithV1Fallback is useful for encoding query +// parameters for custom resources. It tries to convert object to the +// specified version before converting it to query parameters, and falls back to +// converting to v1 if the object is not registered in the specified version. +// For the record, currently API server always treats query parameters sent to a +// custom resource endpoint as v1. +var VersionedParameterEncoderWithV1Fallback runtime.ParameterCodec = versionedParameterEncoderWithV1Fallback{} diff --git a/vendor/k8s.io/client-go/dynamic/client_pool.go b/vendor/k8s.io/client-go/dynamic/client_pool.go new file mode 100644 index 00000000000..a5e1b2978c5 --- /dev/null +++ b/vendor/k8s.io/client-go/dynamic/client_pool.go @@ -0,0 +1,122 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamic + +import ( + "sync" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + restclient "k8s.io/client-go/rest" +) + +// ClientPool manages a pool of dynamic clients. +type ClientPool interface { + // ClientForGroupVersionResource returns a client configured for the specified groupVersionResource. + // Resource may be empty. + ClientForGroupVersionResource(resource schema.GroupVersionResource) (Interface, error) + // ClientForGroupVersionKind returns a client configured for the specified groupVersionKind. + // Kind may be empty. + ClientForGroupVersionKind(kind schema.GroupVersionKind) (Interface, error) +} + +// APIPathResolverFunc knows how to convert a groupVersion to its API path. The Kind field is +// optional. +type APIPathResolverFunc func(kind schema.GroupVersionKind) string + +// LegacyAPIPathResolverFunc can resolve paths properly with the legacy API. +func LegacyAPIPathResolverFunc(kind schema.GroupVersionKind) string { + if len(kind.Group) == 0 { + return "/api" + } + return "/apis" +} + +// clientPoolImpl implements ClientPool and caches clients for the resource group versions +// is asked to retrieve. This type is thread safe. +type clientPoolImpl struct { + lock sync.RWMutex + config *restclient.Config + clients map[schema.GroupVersion]*Client + apiPathResolverFunc APIPathResolverFunc + mapper meta.RESTMapper +} + +// NewClientPool returns a ClientPool from the specified config. It reuses clients for the same +// group version. It is expected this type may be wrapped by specific logic that special cases certain +// resources or groups. +func NewClientPool(config *restclient.Config, mapper meta.RESTMapper, apiPathResolverFunc APIPathResolverFunc) ClientPool { + confCopy := *config + + return &clientPoolImpl{ + config: &confCopy, + clients: map[schema.GroupVersion]*Client{}, + apiPathResolverFunc: apiPathResolverFunc, + mapper: mapper, + } +} + +// Instantiates a new dynamic client pool with the given config. +func NewDynamicClientPool(cfg *restclient.Config) ClientPool { + // restMapper is not needed when using LegacyAPIPathResolverFunc + emptyMapper := meta.MultiRESTMapper{} + return NewClientPool(cfg, emptyMapper, LegacyAPIPathResolverFunc) +} + +// ClientForGroupVersionResource uses the provided RESTMapper to identify the appropriate resource. Resource may +// be empty. If no matching kind is found the underlying client for that group is still returned. +func (c *clientPoolImpl) ClientForGroupVersionResource(resource schema.GroupVersionResource) (Interface, error) { + kinds, err := c.mapper.KindsFor(resource) + if err != nil { + if meta.IsNoMatchError(err) { + return c.ClientForGroupVersionKind(schema.GroupVersionKind{Group: resource.Group, Version: resource.Version}) + } + return nil, err + } + return c.ClientForGroupVersionKind(kinds[0]) +} + +// ClientForGroupVersion returns a client for the specified groupVersion, creates one if none exists. Kind +// in the GroupVersionKind may be empty. +func (c *clientPoolImpl) ClientForGroupVersionKind(kind schema.GroupVersionKind) (Interface, error) { + c.lock.Lock() + defer c.lock.Unlock() + + gv := kind.GroupVersion() + + // do we have a client already configured? + if existingClient, found := c.clients[gv]; found { + return existingClient, nil + } + + // avoid changing the original config + confCopy := *c.config + conf := &confCopy + + // we need to set the api path based on group version, if no group, default to legacy path + conf.APIPath = c.apiPathResolverFunc(kind) + + // we need to make a client + conf.GroupVersion = &gv + + dynamicClient, err := NewClient(conf) + if err != nil { + return nil, err + } + c.clients[gv] = dynamicClient + return dynamicClient, nil +} diff --git a/vendor/k8s.io/client-go/dynamic/dynamic_util.go b/vendor/k8s.io/client-go/dynamic/dynamic_util.go new file mode 100644 index 00000000000..c2cf0daeae0 --- /dev/null +++ b/vendor/k8s.io/client-go/dynamic/dynamic_util.go @@ -0,0 +1,96 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamic + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// VersionInterfaces provides an object converter and metadata +// accessor appropriate for use with unstructured objects. +func VersionInterfaces(schema.GroupVersion) (*meta.VersionInterfaces, error) { + return &meta.VersionInterfaces{ + ObjectConvertor: &unstructured.UnstructuredObjectConverter{}, + MetadataAccessor: meta.NewAccessor(), + }, nil +} + +// NewDiscoveryRESTMapper returns a RESTMapper based on discovery information. +func NewDiscoveryRESTMapper(resources []*metav1.APIResourceList, versionFunc meta.VersionInterfacesFunc) (*meta.DefaultRESTMapper, error) { + rm := meta.NewDefaultRESTMapper(nil, versionFunc) + for _, resourceList := range resources { + gv, err := schema.ParseGroupVersion(resourceList.GroupVersion) + if err != nil { + return nil, err + } + + for _, resource := range resourceList.APIResources { + gvk := gv.WithKind(resource.Kind) + scope := meta.RESTScopeRoot + if resource.Namespaced { + scope = meta.RESTScopeNamespace + } + rm.Add(gvk, scope) + } + } + return rm, nil +} + +// ObjectTyper provides an ObjectTyper implementation for +// unstructured.Unstructured object based on discovery information. +type ObjectTyper struct { + registered map[schema.GroupVersionKind]bool +} + +// NewObjectTyper constructs an ObjectTyper from discovery information. +func NewObjectTyper(resources []*metav1.APIResourceList) (runtime.ObjectTyper, error) { + ot := &ObjectTyper{registered: make(map[schema.GroupVersionKind]bool)} + for _, resourceList := range resources { + gv, err := schema.ParseGroupVersion(resourceList.GroupVersion) + if err != nil { + return nil, err + } + + for _, resource := range resourceList.APIResources { + ot.registered[gv.WithKind(resource.Kind)] = true + } + } + return ot, nil +} + +// ObjectKinds returns a slice of one element with the +// group,version,kind of the provided object, or an error if the +// object is not *unstructured.Unstructured or has no group,version,kind +// information. +func (ot *ObjectTyper) ObjectKinds(obj runtime.Object) ([]schema.GroupVersionKind, bool, error) { + if _, ok := obj.(*unstructured.Unstructured); !ok { + return nil, false, fmt.Errorf("type %T is invalid for determining dynamic object types", obj) + } + return []schema.GroupVersionKind{obj.GetObjectKind().GroupVersionKind()}, false, nil +} + +// Recognizes returns true if the provided group,version,kind was in +// the discovery information. +func (ot *ObjectTyper) Recognizes(gvk schema.GroupVersionKind) bool { + return ot.registered[gvk] +}