diff --git a/cmd/controller/controller-runtime-main.go b/cmd/controller/controller-runtime-main.go index bac956c5037..485b4b44899 100644 --- a/cmd/controller/controller-runtime-main.go +++ b/cmd/controller/controller-runtime-main.go @@ -23,6 +23,7 @@ import ( "github.com/knative/eventing/pkg/controller/eventing/subscription" "github.com/knative/eventing/pkg/controller/feed" "github.com/knative/eventing/pkg/controller/flow" + containercontroller "github.com/knative/eventing/pkg/provisioners/container/controller" "go.uber.org/zap" "strings" @@ -49,6 +50,7 @@ type ProvideFunc func(manager.Manager) (controller.Controller, error) // be added to the default providers list. var ExperimentalControllers = map[string]ProvideFunc{ "subscription.eventing.knative.dev": subscription.ProvideController, + "container-provisioner": containercontroller.ProvideController, } // controllerRuntimeStart runs controllers written for controller-runtime. It's diff --git a/config/500-controller.yaml b/config/500-controller.yaml index 03570245ab7..fd5efda3a76 100644 --- a/config/500-controller.yaml +++ b/config/500-controller.yaml @@ -31,7 +31,7 @@ spec: args: [ "-logtostderr", "-stderrthreshold", "INFO", - "--experimentalControllers=subscription.eventing.knative.dev" # comma separated list. + "--experimentalControllers=subscription.eventing.knative.dev,container-provisioner" # comma separated list. ] volumeMounts: - name: config-logging diff --git a/pkg/apis/eventing/v1alpha1/register.go b/pkg/apis/eventing/v1alpha1/register.go index e3fb19db05d..31536db144e 100644 --- a/pkg/apis/eventing/v1alpha1/register.go +++ b/pkg/apis/eventing/v1alpha1/register.go @@ -49,6 +49,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &ChannelList{}, &ClusterProvisioner{}, &ClusterProvisionerList{}, + &Source{}, + &SourceList{}, &Subscription{}, &SubscriptionList{}, ) diff --git a/pkg/apis/eventing/v1alpha1/source_types.go b/pkg/apis/eventing/v1alpha1/source_types.go index 5b58b416b95..29ded4077f0 100644 --- a/pkg/apis/eventing/v1alpha1/source_types.go +++ b/pkg/apis/eventing/v1alpha1/source_types.go @@ -17,6 +17,7 @@ limitations under the License. package v1alpha1 import ( + "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/knative/pkg/apis/duck" @@ -111,6 +112,12 @@ type SourceStatus struct { // +patchStrategy=merge Conditions duckv1alpha1.Conditions `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"` + // Provisioned holds the status of a Provisioned Object at a point in time. + // +optional + // +patchMergeKey=name + // +patchStrategy=merge + Provisioned []ProvisionedObjectStatus `json:"provisioned,omitempty" patchStrategy:"merge" patchMergeKey:"name"` + // ObservedGeneration is the 'Generation' of the Source that // was last reconciled by the controller. // +optional @@ -120,6 +127,17 @@ type SourceStatus struct { Subscribable duckv1alpha1.Subscribable `json:"subscribable,omitempty"` } +type ProvisionedObjectStatus struct { + // Name of Object + Name string `json:"name,omitempty"` + // Type is the fully qualified object type. + Type string `json:"type,omitempty"` + // Status is the current relationship between Source and Object. + Status string `json:"status,omitempty"` + // Reason is the detailed description describing current relationship status. + Reason string `json:"reason,omitempty"` +} + // GetCondition returns the condition currently associated with the given type, or nil. func (ss *SourceStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha1.Condition { return sourceCondSet.Manage(ss).GetCondition(t) @@ -145,6 +163,26 @@ func (ss *SourceStatus) MarkDeprovisioned(reason, messageFormat string, messageA sourceCondSet.Manage(ss).MarkFalse(SourceConditionProvisioned, reason, messageFormat, messageA...) } +// MarkProvisioned sets the condition that the source has had its backing resources created. +func (ss *SourceStatus) SetProvisionedObjectState(name, objType, status, reasonFormat string, reasonA ...interface{}) { + reason := fmt.Sprintf(reasonFormat, reasonA...) + newP := ProvisionedObjectStatus{ + Name: name, + Type: objType, + Status: status, + Reason: reason, + } + newProvisioned := make([]ProvisionedObjectStatus, 0, len(ss.Provisioned)) + for _, p := range ss.Provisioned { + if p.Name == newP.Name { + newProvisioned = append(newProvisioned, newP) + } else { + newProvisioned = append(newProvisioned, p) + } + } + ss.Provisioned = newProvisioned +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // SourceList is a list of Source resources diff --git a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go index dcbd9410622..abbe151f621 100644 --- a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go @@ -290,6 +290,22 @@ func (in *ClusterProvisionerStatus) DeepCopy() *ClusterProvisionerStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ProvisionedObjectStatus) DeepCopyInto(out *ProvisionedObjectStatus) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ProvisionedObjectStatus. +func (in *ProvisionedObjectStatus) DeepCopy() *ProvisionedObjectStatus { + if in == nil { + return nil + } + out := new(ProvisionedObjectStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ProvisionerReference) DeepCopyInto(out *ProvisionerReference) { *out = *in @@ -454,6 +470,11 @@ func (in *SourceStatus) DeepCopyInto(out *SourceStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Provisioned != nil { + in, out := &in.Provisioned, &out.Provisioned + *out = make([]ProvisionedObjectStatus, len(*in)) + copy(*out, *in) + } out.Subscribable = in.Subscribable return } diff --git a/pkg/controller/owner_references.go b/pkg/controller/owner_references.go index 0ed772729af..5780dca9af9 100644 --- a/pkg/controller/owner_references.go +++ b/pkg/controller/owner_references.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" channelsv1alpha "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" feedsv1alpha "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" flowsv1alpha "github.com/knative/eventing/pkg/apis/flows/v1alpha1" ) @@ -51,6 +52,16 @@ func kind(obj metav1.Object) schema.GroupVersionKind { case *flowsv1alpha.Flow: return flowsv1alpha.SchemeGroupVersion.WithKind("Flow") + // Eventing + case *eventingv1alpha.Source: + return eventingv1alpha.SchemeGroupVersion.WithKind("Source") + case *eventingv1alpha.Channel: + return eventingv1alpha.SchemeGroupVersion.WithKind("Channel") + case *eventingv1alpha.ClusterProvisioner: + return eventingv1alpha.SchemeGroupVersion.WithKind("ClusterProvisioner") + case *eventingv1alpha.Subscription: + return eventingv1alpha.SchemeGroupVersion.WithKind("Subscription") + default: panic(fmt.Sprintf("Unsupported object type %T", obj)) } diff --git a/pkg/provisioners/container/config/container.yaml b/pkg/provisioners/container/config/container.yaml new file mode 100644 index 00000000000..cef82c759bc --- /dev/null +++ b/pkg/provisioners/container/config/container.yaml @@ -0,0 +1,8 @@ +apiVersion: eventing.knative.dev/v1alpha1 +kind: ClusterProvisioner +metadata: + name: container +spec: + reconciles: + group: eventing.knative.dev + kind: Source diff --git a/pkg/provisioners/container/controller/provider.go b/pkg/provisioners/container/controller/provider.go new file mode 100644 index 00000000000..0d502943486 --- /dev/null +++ b/pkg/provisioners/container/controller/provider.go @@ -0,0 +1,69 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/provisioners/sdk" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +const ( + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "container-provisioner-controller" +) + +type reconciler struct { + client client.Client + restConfig *rest.Config + dynamicClient dynamic.Interface + recorder record.EventRecorder +} + +// ProvideController returns a Subscription controller. +func ProvideController(mgr manager.Manager) (controller.Controller, error) { + p := &sdk.Provider{ + AgentName: controllerAgentName, + Parent: &v1alpha1.Source{}, + Owns: []runtime.Object{&appsv1.Deployment{}, &v1alpha1.Channel{}}, + Reconciler: &reconciler{ + recorder: mgr.GetRecorder(controllerAgentName), + }, + } + + return p.ProvideController(mgr) +} + +func (r *reconciler) InjectClient(c client.Client) error { + r.client = c + return nil +} + +func (r *reconciler) InjectConfig(c *rest.Config) error { + r.restConfig = c + var err error + r.dynamicClient, err = dynamic.NewForConfig(c) + return err +} diff --git a/pkg/provisioners/container/controller/reconcile.go b/pkg/provisioners/container/controller/reconcile.go new file mode 100644 index 00000000000..288c6bb5f22 --- /dev/null +++ b/pkg/provisioners/container/controller/reconcile.go @@ -0,0 +1,212 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "encoding/json" + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/provisioners/container/controller/resources" + "github.com/knative/pkg/logging" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + provisionerName = "container" +) + +// Reconcile compares the actual state with the desired, and attempts to +// converge the two. + +func (r *reconciler) Reconcile(ctx context.Context, object runtime.Object) (runtime.Object, error) { + logger := logging.FromContext(ctx) + + source, ok := object.(*v1alpha1.Source) + if !ok { + logger.Errorf("could not find source %v\n", object) + return object, nil + } + + if source.Spec.Provisioner.Ref.Name != provisionerName { + logger.Errorf("skipping source %s, provisioned by %s\n", source.Name, source.Spec.Provisioner.Ref.Name) + return source, nil + } + + // See if the source has been deleted + accessor, err := meta.Accessor(source) + if err != nil { + logger.Warnf("Failed to get metadata accessor: %s", err) + return object, err + } + // No need to reconcile if the source has been marked for deletion. + deletionTimestamp := accessor.GetDeletionTimestamp() + if deletionTimestamp != nil { + return object, err + } + + source.Status.InitializeConditions() + + args := &resources.ContainerArguments{} + if source.Spec.Arguments != nil { + if err := json.Unmarshal(source.Spec.Arguments.Raw, args); err != nil { + logger.Infof("Error: %s failed to unmarshal arguments, %v", source.Name, err) + } + } + args.Name = source.Name + args.Namespace = source.Namespace + + channel, err := r.getChannel(ctx, source) + if err != nil { + fqn := "Channel.eventing.knative.dev/v1alpha1" + if errors.IsNotFound(err) { + channel, err = r.createChannel(ctx, source, nil, args) + if err != nil { + return object, err + } + r.recorder.Eventf(source, corev1.EventTypeNormal, "Provisioned", "Created channel %q", channel.Name) + source.Status.SetProvisionedObjectState(channel.Name, fqn, "Created", "Created channel %q", channel.Name) + source.Status.MarkDeprovisioned("Provisioning", "Provisioning Channel %s", args.Name) + } else { + if channel.Status.IsReady() { + source.Status.SetProvisionedObjectState(channel.Name, fqn, "Ready", "") + } + } + + source.Status.Subscribable.Channelable.Namespace = channel.Namespace + source.Status.Subscribable.Channelable.Name = channel.Name + source.Status.Subscribable.Channelable.APIVersion = "eventing.knative.dev/v1alpha1" + source.Status.Subscribable.Channelable.Kind = "Channel" + + } + + deploy, err := r.getDeployment(ctx, source) + if err != nil { + fqn := "Deployment.apps/v1" + if errors.IsNotFound(err) { + deploy, err = r.createDeployment(ctx, source, nil, channel, args) + if err != nil { + r.recorder.Eventf(source, corev1.EventTypeNormal, "Blocked", "waiting for %v", err) + source.Status.SetProvisionedObjectState(args.Name, fqn, "Blocked", "waiting for %v", args.Name, err) + return object, err + } + r.recorder.Eventf(source, corev1.EventTypeNormal, "Provisioned", "Created deployment %q", deploy.Name) + source.Status.SetProvisionedObjectState(deploy.Name, fqn, "Created", "Created deployment %q", deploy.Name) + source.Status.MarkDeprovisioned("Provisioning", "Provisioning deployment %s", args.Name) + } else { + if deploy.Status.ReadyReplicas > 0 { + source.Status.SetProvisionedObjectState(deploy.Name, fqn, "Ready", "") + source.Status.MarkProvisioned() + } + } + } + + return source, nil +} + +func (r *reconciler) getChannel(ctx context.Context, source *v1alpha1.Source) (*v1alpha1.Channel, error) { + logger := logging.FromContext(ctx) + + list := &v1alpha1.ChannelList{} + err := r.client.List( + ctx, + &client.ListOptions{ + Namespace: source.Namespace, + LabelSelector: labels.Everything(), + // TODO this is here because the fake client needs it. Remove this when it's no longer + // needed. + Raw: &metav1.ListOptions{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1alpha1.SchemeGroupVersion.String(), + Kind: "Channel", + }, + }, + }, + list) + if err != nil { + logger.Errorf("Unable to list channels: %v", err) + return nil, err + } + for _, c := range list.Items { + if metav1.IsControlledBy(&c, source) { + return &c, nil + } + } + return nil, errors.NewNotFound(schema.GroupResource{}, "") +} + +func (r *reconciler) createChannel(ctx context.Context, source *v1alpha1.Source, org *v1alpha1.Channel, args *resources.ContainerArguments) (*v1alpha1.Channel, error) { + channel, err := resources.MakeChannel(source, org, args) + if err != nil { + return nil, err + } + + if err := r.client.Create(ctx, channel); err != nil { + return nil, err + } + return channel, nil +} + +func (r *reconciler) getDeployment(ctx context.Context, source *v1alpha1.Source) (*appsv1.Deployment, error) { + logger := logging.FromContext(ctx) + + list := &appsv1.DeploymentList{} + err := r.client.List( + ctx, + &client.ListOptions{ + Namespace: source.Namespace, + LabelSelector: labels.Everything(), + // TODO this is here because the fake client needs it. Remove this when it's no longer + // needed. + Raw: &metav1.ListOptions{ + TypeMeta: metav1.TypeMeta{ + APIVersion: appsv1.SchemeGroupVersion.String(), + Kind: "Deployment", + }, + }, + }, + list) + if err != nil { + logger.Errorf("Unable to list deployments: %v", err) + return nil, err + } + for _, c := range list.Items { + if metav1.IsControlledBy(&c, source) { + return &c, nil + } + } + return nil, errors.NewNotFound(schema.GroupResource{}, "") +} + +func (r *reconciler) createDeployment(ctx context.Context, source *v1alpha1.Source, org *appsv1.Deployment, channel *v1alpha1.Channel, args *resources.ContainerArguments) (*appsv1.Deployment, error) { + deployment, err := resources.MakeDeployment(source, org, channel, args) + if err != nil { + return nil, err + } + + if err := r.client.Create(ctx, deployment); err != nil { + return nil, err + } + return deployment, nil +} diff --git a/pkg/provisioners/container/controller/resources/arguments.go b/pkg/provisioners/container/controller/resources/arguments.go new file mode 100644 index 00000000000..1cdd0b369bc --- /dev/null +++ b/pkg/provisioners/container/controller/resources/arguments.go @@ -0,0 +1,24 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +type ContainerArguments struct { + Name string `json:"-"` + Namespace string `json:"-"` + Image string `json:"image"` + Args map[string]string `json:"args"` +} diff --git a/pkg/provisioners/container/controller/resources/channel.go b/pkg/provisioners/container/controller/resources/channel.go new file mode 100644 index 00000000000..93aa6d3da44 --- /dev/null +++ b/pkg/provisioners/container/controller/resources/channel.go @@ -0,0 +1,57 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/controller" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func MakeChannel(source *v1alpha1.Source, org *v1alpha1.Channel, args *ContainerArguments) (*v1alpha1.Channel, error) { + channel := &v1alpha1.Channel{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Channel", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: args.Name + "-", + Namespace: args.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *controller.NewControllerRef(source, false), + }, + }, + Spec: v1alpha1.ChannelSpec{ + Provisioner: &v1alpha1.ProvisionerReference{ + Ref: &corev1.ObjectReference{ + Name: "in-memory-channel", // TODO: this should be changeable + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "ClusterProvisioner", + }, + }, + }, + } + if org != nil { + // We need to keep the generation from the original channel until we use the correct + // generation generation. The knative/pkg webhook will reject the update if we do not + // do this. + channel.Spec.Generation = org.Spec.Generation + } + return channel, nil +} diff --git a/pkg/provisioners/container/controller/resources/deployment.go b/pkg/provisioners/container/controller/resources/deployment.go new file mode 100644 index 00000000000..5886f5758d3 --- /dev/null +++ b/pkg/provisioners/container/controller/resources/deployment.go @@ -0,0 +1,84 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "fmt" + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/controller" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func MakeDeployment(source *v1alpha1.Source, org *appsv1.Deployment, channel *v1alpha1.Channel, args *ContainerArguments) (*appsv1.Deployment, error) { + + if channel == nil || channel.Status.Sinkable.DomainInternal == "" { + return nil, fmt.Errorf("channel not ready") + } + + containerArgs := []string(nil) + if args != nil { + containerArgs = make([]string, 0, len(args.Args)+1) + for k, v := range args.Args { + containerArgs = append(containerArgs, fmt.Sprintf("--%s=%q", k, v)) + } + } + remote := fmt.Sprintf("--remote=http://%s", channel.Status.Sinkable.DomainInternal) + containerArgs = append(containerArgs, remote) + + deploy := &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + GenerateName: args.Name + "-", + Namespace: args.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *controller.NewControllerRef(source, false), + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: func() *int32 { var i int32 = 1; return &i }(), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "source": args.Name, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "source": args.Name, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "source", + Image: args.Image, + Args: containerArgs, + ImagePullPolicy: corev1.PullAlways, + }, + }, + }, + }, + }, + } + return deploy, nil +} diff --git a/pkg/provisioners/sdk/provider.go b/pkg/provisioners/sdk/provider.go new file mode 100644 index 00000000000..4dd714fdc72 --- /dev/null +++ b/pkg/provisioners/sdk/provider.go @@ -0,0 +1,75 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sdk + +import ( + "context" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +type KnativeReconciler interface { + Reconcile(ctx context.Context, object runtime.Object) (runtime.Object, error) + InjectClient(c client.Client) error + InjectConfig(c *rest.Config) error +} + +type Provider struct { + AgentName string + // Parent is a resource kind to reconcile with empty content. i.e. &v1.Parent{} + Parent runtime.Object + // Owns are dependent resources owned by the parent for which changes to + // those resources cause the Parent to be re-reconciled. This is a list of + // resources of kind with empty content. i.e. [&v1.Child{}] + Owns []runtime.Object + + Reconciler KnativeReconciler +} + +// ProvideController returns a controller for controller-runtime. +func (p *Provider) ProvideController(mgr manager.Manager) (controller.Controller, error) { + // Setup a new controller to Reconcile Subscriptions. + c, err := controller.New(p.AgentName, mgr, controller.Options{ + Reconciler: &reconciler{ + provider: *p, + recorder: mgr.GetRecorder(p.AgentName), + }, + }) + if err != nil { + return nil, err + } + + // Watch Parent events and enqueue Parent object key. + if err := c.Watch(&source.Kind{Type: p.Parent}, &handler.EnqueueRequestForObject{}); err != nil { + return nil, err + } + + // Watch and enqueue for owning obj key. + for _, t := range p.Owns { + if err := c.Watch(&source.Kind{Type: t}, + &handler.EnqueueRequestForOwner{OwnerType: p.Parent, IsController: true}); err != nil { + return nil, err + } + } + + return c, nil +} diff --git a/pkg/provisioners/sdk/reconciler.go b/pkg/provisioners/sdk/reconciler.go new file mode 100644 index 00000000000..3c63574812c --- /dev/null +++ b/pkg/provisioners/sdk/reconciler.go @@ -0,0 +1,145 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sdk + +import ( + "context" + "github.com/knative/pkg/logging" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +type reconciler struct { + client client.Client + restConfig *rest.Config + dynamicClient dynamic.Interface + recorder record.EventRecorder + + provider Provider +} + +// Verify the struct implements reconcile.Reconciler +var _ reconcile.Reconciler = &reconciler{} + +// Reconcile compares the actual state with the desired, and attempts to +// converge the two. +func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { + ctx := context.TODO() + logger := logging.FromContext(ctx) + + logger.Infof("Reconciling %s %v", r.provider.Parent.GetObjectKind(), request) + + obj := r.provider.Parent.DeepCopyObject() + + err := r.client.Get(context.TODO(), request.NamespacedName, obj) + + if errors.IsNotFound(err) { + logger.Errorf("could not find %s %v\n", r.provider.Parent.GetObjectKind(), request) + return reconcile.Result{}, nil + } + + if err != nil { + logger.Errorf("could not fetch %s %v for %+v\n", r.provider.Parent.GetObjectKind(), err, request) + return reconcile.Result{}, err + } + + original := obj.DeepCopyObject() + + // Reconcile this copy of the Source and then write back any status + // updates regardless of whether the reconcile error out. + obj, err = r.provider.Reconciler.Reconcile(ctx, obj) + if err != nil { + logger.Warnf("Failed to reconcile %s: %v", r.provider.Parent.GetObjectKind(), err) + } + + if chg, err := r.statusHasChanged(ctx, original, obj); err != nil || !chg { + // If we didn't change anything then don't call updateStatus. + // This is important because the copy we loaded from the informer's + // cache may be stale and we don't want to overwrite a prior update + // to status with this stale state. + return reconcile.Result{}, err + } else if _, err := r.updateStatus(ctx, request, obj); err != nil { + logger.Warnf("Failed to update %s status: %v", r.provider.Parent.GetObjectKind(), err) + return reconcile.Result{}, err + } + + // Requeue if the resource is not ready: + return reconcile.Result{}, err +} + +func (r *reconciler) InjectClient(c client.Client) error { + r.client = c + if r.provider.Reconciler != nil { + r.provider.Reconciler.InjectClient(c) + } + return nil +} + +func (r *reconciler) InjectConfig(c *rest.Config) error { + r.restConfig = c + var err error + r.dynamicClient, err = dynamic.NewForConfig(c) + if r.provider.Reconciler != nil { + r.provider.Reconciler.InjectConfig(c) + } + return err +} + +func (r *reconciler) statusHasChanged(ctx context.Context, old, new runtime.Object) (bool, error) { + if old == nil { + return true, nil + } + + o := NewReflectedStatusAccessor(old) + n := NewReflectedStatusAccessor(new) + + oStatus := o.GetStatus() + nStatus := n.GetStatus() + + if equality.Semantic.DeepEqual(oStatus, nStatus) { + return false, nil + } + + return true, nil +} + +func (r *reconciler) updateStatus(ctx context.Context, request reconcile.Request, object runtime.Object) (runtime.Object, error) { + freshObj := r.provider.Parent.DeepCopyObject() + if err := r.client.Get(ctx, request.NamespacedName, freshObj); err != nil { + return nil, err + } + + fresh := NewReflectedStatusAccessor(freshObj) + org := NewReflectedStatusAccessor(object) + + fresh.SetStatus(org.GetStatus()) + + // Until #38113 is merged, we must use Update instead of UpdateStatus to + // update the Status block of the Source resource. UpdateStatus will not + // allow changes to the Spec of the resource, which is ideal for ensuring + // nothing other than resource status has been updated. + if err := r.client.Update(ctx, freshObj); err != nil { + return nil, err + } + return freshObj, nil +} diff --git a/pkg/provisioners/sdk/status_accessor.go b/pkg/provisioners/sdk/status_accessor.go new file mode 100644 index 00000000000..ce5712b268f --- /dev/null +++ b/pkg/provisioners/sdk/status_accessor.go @@ -0,0 +1,75 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sdk + +import ( + "reflect" +) + +// Conditions is the interface for a Resource that implements the getter and +// setter for accessing a Condition collection. +// +k8s:deepcopy-gen=true +type StatusAccessor interface { + GetStatus() interface{} + SetStatus(interface{}) +} + +// NewReflectedConditionsAccessor uses reflection to return a ConditionsAccessor +// to access the field called "Conditions". +func NewReflectedStatusAccessor(object interface{}) StatusAccessor { + objectValue := reflect.Indirect(reflect.ValueOf(object)) + + // If object is not a struct, don't even try to use it. + if objectValue.Kind() != reflect.Struct { + return nil + } + + statusField := objectValue.FieldByName("Status") + + if statusField.IsValid() && statusField.CanInterface() && statusField.CanSet() { + if _, ok := statusField.Interface().(interface{}); ok { + return &reflectedStatusAccessor{ + status: statusField, + } + } + } + return nil +} + +// reflectedConditionsAccessor is an internal wrapper object to act as the +// ConditionsAccessor for status objects that do not implement ConditionsAccessor +// directly, but do expose the field using the "Conditions" field name. +type reflectedStatusAccessor struct { + status reflect.Value +} + +// GetConditions uses reflection to return Conditions from the held status object. +func (r *reflectedStatusAccessor) GetStatus() interface{} { + if r != nil && r.status.IsValid() && r.status.CanInterface() { + if status, ok := r.status.Interface().(interface{}); ok { + return status + } + } + return nil +} + +// SetConditions uses reflection to set Conditions on the held status object. +func (r *reflectedStatusAccessor) SetStatus(status interface{}) { + if r != nil && r.status.IsValid() && r.status.CanSet() { + r.status.Set(reflect.ValueOf(status)) + } +} diff --git a/pkg/sources/heartbeats/cmd/main.go b/pkg/sources/heartbeats/cmd/main.go new file mode 100644 index 00000000000..99a94f02568 --- /dev/null +++ b/pkg/sources/heartbeats/cmd/main.go @@ -0,0 +1,72 @@ +package main + +import ( + "bytes" + "encoding/json" + "flag" + "io" + "log" + "net/http" + "strconv" + "strings" + "time" +) + +type Heartbeat struct { + Sequence int `json:"id"` + Label string `json:"label"` +} + +var ( + remote string + label string + periodStr string + period int + sequence int + hb *Heartbeat +) + +func init() { + flag.StringVar(&remote, "remote", "", "the host url to heartbeat to") + flag.StringVar(&label, "label", "", "a special label") + flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats") +} + +func main() { + flag.Parse() + + hb = &Heartbeat{ + Sequence: 0, + Label: label, + } + + period, err := strconv.Atoi(periodStr) + if err != nil { + period = 5 + } + + for { + send() + time.Sleep(time.Duration(period) * time.Second) + } +} + +func send() { + hb.Sequence++ + resp, err := http.Post(remote, "application/json", body()) + if err != nil { + log.Printf("Unable to make request: %+v, %v", hb, err) + return + } else { + log.Printf("[%d]: %+v", resp.StatusCode, hb) + } + defer resp.Body.Close() +} + +func body() io.Reader { + b, err := json.Marshal(hb) + if err != nil { + return strings.NewReader("{\"error\":\"true\"}") + } + return bytes.NewBuffer(b) +} diff --git a/pkg/sources/heartbeats/config/source.yaml b/pkg/sources/heartbeats/config/source.yaml new file mode 100644 index 00000000000..32aea6d35d8 --- /dev/null +++ b/pkg/sources/heartbeats/config/source.yaml @@ -0,0 +1,14 @@ +apiVersion: eventing.knative.dev/v1alpha1 +kind: Source +metadata: + name: heartbeats + namespace: default +spec: + provisioner: + ref: + name: container + arguments: + image: github.com/knative/eventing/pkg/sources/heartbeats/cmd/ + args: + label: "<3" + period: "2" diff --git a/pkg/sources/heartbeats/config/subscription.yaml b/pkg/sources/heartbeats/config/subscription.yaml new file mode 100644 index 00000000000..d1b361bcfeb --- /dev/null +++ b/pkg/sources/heartbeats/config/subscription.yaml @@ -0,0 +1,15 @@ +apiVersion: eventing.knative.dev/v1alpha1 +kind: Subscription +metadata: + name: heartbeats + namespace: default +spec: + from: + kind: Source + apiVersion: eventing.knative.dev/v1alpha1 + name: heartbeats + call: + target: + kind: Service + apiVersion: serving.knative.dev/v1alpha1 + name: message-dumper diff --git a/third_party/VENDOR-LICENSE b/third_party/VENDOR-LICENSE index 45ce934d7ba..e1969f98375 100644 --- a/third_party/VENDOR-LICENSE +++ b/third_party/VENDOR-LICENSE @@ -4493,6 +4493,39 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +=========================================================== +Import: github.com/knative/eventing/vendor/golang.org/x/sync + +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + + =========================================================== Import: github.com/knative/eventing/vendor/golang.org/x/sys