From 280adb584e655140607ca5d4ad889931b3d9aee5 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Wed, 10 Oct 2018 11:44:09 -0700 Subject: [PATCH 01/10] checkpoint. --- Gopkg.lock | 2 +- cmd/controller/controller-runtime-main.go | 2 + config/500-controller.yaml | 2 +- pkg/apis/eventing/v1alpha1/channel_types.go | 10 + pkg/apis/eventing/v1alpha1/register.go | 2 + pkg/apis/eventing/v1alpha1/source_types.go | 38 ++++ .../v1alpha1/zz_generated.deepcopy.go | 21 ++ pkg/controller/owner_references.go | 11 + pkg/provisioners/heartbeats/cmd/main.go | 63 ++++++ .../heartbeats/config/examples/source.yaml | 12 ++ .../config/examples/subscription.yaml | 15 ++ .../heartbeats/config/heartbeat.yaml | 8 + .../heartbeats/controller/provider.go | 78 +++++++ .../heartbeats/controller/reconcile.go | 193 ++++++++++++++++++ .../controller/resources/arguments.go | 24 +++ .../controller/resources/channel.go | 54 +++++ .../heartbeats/controller/resources/names.go | 27 +++ .../heartbeats/controller/resources/pod.go | 75 +++++++ 18 files changed, 635 insertions(+), 2 deletions(-) create mode 100644 pkg/provisioners/heartbeats/cmd/main.go create mode 100644 pkg/provisioners/heartbeats/config/examples/source.yaml create mode 100644 pkg/provisioners/heartbeats/config/examples/subscription.yaml create mode 100644 pkg/provisioners/heartbeats/config/heartbeat.yaml create mode 100644 pkg/provisioners/heartbeats/controller/provider.go create mode 100644 pkg/provisioners/heartbeats/controller/reconcile.go create mode 100644 pkg/provisioners/heartbeats/controller/resources/arguments.go create mode 100644 pkg/provisioners/heartbeats/controller/resources/channel.go create mode 100644 pkg/provisioners/heartbeats/controller/resources/names.go create mode 100644 pkg/provisioners/heartbeats/controller/resources/pod.go diff --git a/Gopkg.lock b/Gopkg.lock index 56a689ad680..01f393fa2e4 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -268,7 +268,7 @@ revision = "5c1d8c8469d1ed34b2aecf4c2305b3a57fff2ee3" [[projects]] - digest = "1:3032bf41e1ec7fe0093c6db659b3cf202ef528c421c26376a7fe209467a9fd74" + digest = "1:a3f465e8fba2ec1a371c52063ce81f17fbf7356a8cadbc160b2f94796ff5e785" name = "github.com/knative/pkg" packages = [ "apis", diff --git a/cmd/controller/controller-runtime-main.go b/cmd/controller/controller-runtime-main.go index bac956c5037..1673ad68939 100644 --- a/cmd/controller/controller-runtime-main.go +++ b/cmd/controller/controller-runtime-main.go @@ -23,6 +23,7 @@ import ( "github.com/knative/eventing/pkg/controller/eventing/subscription" "github.com/knative/eventing/pkg/controller/feed" "github.com/knative/eventing/pkg/controller/flow" + heartbeatcontroller "github.com/knative/eventing/pkg/provisioners/heartbeats/controller" "go.uber.org/zap" "strings" @@ -49,6 +50,7 @@ type ProvideFunc func(manager.Manager) (controller.Controller, error) // be added to the default providers list. var ExperimentalControllers = map[string]ProvideFunc{ "subscription.eventing.knative.dev": subscription.ProvideController, + "heartbeats-provisioner": heartbeatcontroller.ProvideController, } // controllerRuntimeStart runs controllers written for controller-runtime. It's diff --git a/config/500-controller.yaml b/config/500-controller.yaml index 03570245ab7..83d22d65e3b 100644 --- a/config/500-controller.yaml +++ b/config/500-controller.yaml @@ -31,7 +31,7 @@ spec: args: [ "-logtostderr", "-stderrthreshold", "INFO", - "--experimentalControllers=subscription.eventing.knative.dev" # comma separated list. + "--experimentalControllers=subscription.eventing.knative.dev,heartbeats-provisioner" # comma separated list. ] volumeMounts: - name: config-logging diff --git a/pkg/apis/eventing/v1alpha1/channel_types.go b/pkg/apis/eventing/v1alpha1/channel_types.go index 316f2f7e407..5c809cfbd65 100644 --- a/pkg/apis/eventing/v1alpha1/channel_types.go +++ b/pkg/apis/eventing/v1alpha1/channel_types.go @@ -117,6 +117,16 @@ func (cs *ChannelStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha return chanCondSet.Manage(cs).GetCondition(t) } +// IsReady returns true if the resource is ready overall. +func (cs *ChannelStatus) IsReady() bool { + return chanCondSet.Manage(cs).IsHappy() +} + +// InitializeConditions sets relevant unset conditions to Unknown state. +func (cs *ChannelStatus) InitializeConditions() { + chanCondSet.Manage(cs).InitializeConditions() +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // ChannelList is a collection of Channels. diff --git a/pkg/apis/eventing/v1alpha1/register.go b/pkg/apis/eventing/v1alpha1/register.go index e3fb19db05d..31536db144e 100644 --- a/pkg/apis/eventing/v1alpha1/register.go +++ b/pkg/apis/eventing/v1alpha1/register.go @@ -49,6 +49,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &ChannelList{}, &ClusterProvisioner{}, &ClusterProvisionerList{}, + &Source{}, + &SourceList{}, &Subscription{}, &SubscriptionList{}, ) diff --git a/pkg/apis/eventing/v1alpha1/source_types.go b/pkg/apis/eventing/v1alpha1/source_types.go index 5b58b416b95..29ded4077f0 100644 --- a/pkg/apis/eventing/v1alpha1/source_types.go +++ b/pkg/apis/eventing/v1alpha1/source_types.go @@ -17,6 +17,7 @@ limitations under the License. package v1alpha1 import ( + "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/knative/pkg/apis/duck" @@ -111,6 +112,12 @@ type SourceStatus struct { // +patchStrategy=merge Conditions duckv1alpha1.Conditions `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"` + // Provisioned holds the status of a Provisioned Object at a point in time. + // +optional + // +patchMergeKey=name + // +patchStrategy=merge + Provisioned []ProvisionedObjectStatus `json:"provisioned,omitempty" patchStrategy:"merge" patchMergeKey:"name"` + // ObservedGeneration is the 'Generation' of the Source that // was last reconciled by the controller. // +optional @@ -120,6 +127,17 @@ type SourceStatus struct { Subscribable duckv1alpha1.Subscribable `json:"subscribable,omitempty"` } +type ProvisionedObjectStatus struct { + // Name of Object + Name string `json:"name,omitempty"` + // Type is the fully qualified object type. + Type string `json:"type,omitempty"` + // Status is the current relationship between Source and Object. + Status string `json:"status,omitempty"` + // Reason is the detailed description describing current relationship status. + Reason string `json:"reason,omitempty"` +} + // GetCondition returns the condition currently associated with the given type, or nil. func (ss *SourceStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha1.Condition { return sourceCondSet.Manage(ss).GetCondition(t) @@ -145,6 +163,26 @@ func (ss *SourceStatus) MarkDeprovisioned(reason, messageFormat string, messageA sourceCondSet.Manage(ss).MarkFalse(SourceConditionProvisioned, reason, messageFormat, messageA...) } +// MarkProvisioned sets the condition that the source has had its backing resources created. +func (ss *SourceStatus) SetProvisionedObjectState(name, objType, status, reasonFormat string, reasonA ...interface{}) { + reason := fmt.Sprintf(reasonFormat, reasonA...) + newP := ProvisionedObjectStatus{ + Name: name, + Type: objType, + Status: status, + Reason: reason, + } + newProvisioned := make([]ProvisionedObjectStatus, 0, len(ss.Provisioned)) + for _, p := range ss.Provisioned { + if p.Name == newP.Name { + newProvisioned = append(newProvisioned, newP) + } else { + newProvisioned = append(newProvisioned, p) + } + } + ss.Provisioned = newProvisioned +} + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // SourceList is a list of Source resources diff --git a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go index dcbd9410622..abbe151f621 100644 --- a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go @@ -290,6 +290,22 @@ func (in *ClusterProvisionerStatus) DeepCopy() *ClusterProvisionerStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ProvisionedObjectStatus) DeepCopyInto(out *ProvisionedObjectStatus) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ProvisionedObjectStatus. +func (in *ProvisionedObjectStatus) DeepCopy() *ProvisionedObjectStatus { + if in == nil { + return nil + } + out := new(ProvisionedObjectStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ProvisionerReference) DeepCopyInto(out *ProvisionerReference) { *out = *in @@ -454,6 +470,11 @@ func (in *SourceStatus) DeepCopyInto(out *SourceStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Provisioned != nil { + in, out := &in.Provisioned, &out.Provisioned + *out = make([]ProvisionedObjectStatus, len(*in)) + copy(*out, *in) + } out.Subscribable = in.Subscribable return } diff --git a/pkg/controller/owner_references.go b/pkg/controller/owner_references.go index 0ed772729af..5780dca9af9 100644 --- a/pkg/controller/owner_references.go +++ b/pkg/controller/owner_references.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" channelsv1alpha "github.com/knative/eventing/pkg/apis/channels/v1alpha1" + eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" feedsv1alpha "github.com/knative/eventing/pkg/apis/feeds/v1alpha1" flowsv1alpha "github.com/knative/eventing/pkg/apis/flows/v1alpha1" ) @@ -51,6 +52,16 @@ func kind(obj metav1.Object) schema.GroupVersionKind { case *flowsv1alpha.Flow: return flowsv1alpha.SchemeGroupVersion.WithKind("Flow") + // Eventing + case *eventingv1alpha.Source: + return eventingv1alpha.SchemeGroupVersion.WithKind("Source") + case *eventingv1alpha.Channel: + return eventingv1alpha.SchemeGroupVersion.WithKind("Channel") + case *eventingv1alpha.ClusterProvisioner: + return eventingv1alpha.SchemeGroupVersion.WithKind("ClusterProvisioner") + case *eventingv1alpha.Subscription: + return eventingv1alpha.SchemeGroupVersion.WithKind("Subscription") + default: panic(fmt.Sprintf("Unsupported object type %T", obj)) } diff --git a/pkg/provisioners/heartbeats/cmd/main.go b/pkg/provisioners/heartbeats/cmd/main.go new file mode 100644 index 00000000000..6f8749157d8 --- /dev/null +++ b/pkg/provisioners/heartbeats/cmd/main.go @@ -0,0 +1,63 @@ +package main + +import ( + "bytes" + "encoding/json" + "flag" + "io" + "log" + "net/http" + "strings" + "time" +) + +type heartbeat struct { + Sequence int `json:"id"` + Label string `json:"label"` +} + +var ( + remote string + label string + period int + sequence int + hb *heartbeat +) + +func init() { + flag.StringVar(&remote, "remote", "", "the host url to heartbeat to") + flag.StringVar(&label, "label", "", "a special label") + flag.IntVar(&period, "period", 5, "the number of seconds between heartbeats") +} + +func main() { + flag.Parse() + + hb = &heartbeat{ + Sequence: 0, + Label: label, + } + + for { + send() + time.Sleep(time.Duration(period) * time.Second) + } +} + +func send() { + sequence++ + resp, err := http.Post(remote, "application/json", body()) + if err != nil { + log.Printf("Unable to make request: %v, %+v", sequence, err) + return + } + defer resp.Body.Close() +} + +func body() io.Reader { + b, err := json.Marshal(hb) + if err == nil { + return strings.NewReader("{\"error\":\"true\"}") + } + return bytes.NewBuffer(b) +} diff --git a/pkg/provisioners/heartbeats/config/examples/source.yaml b/pkg/provisioners/heartbeats/config/examples/source.yaml new file mode 100644 index 00000000000..e5b29856fc9 --- /dev/null +++ b/pkg/provisioners/heartbeats/config/examples/source.yaml @@ -0,0 +1,12 @@ +apiVersion: eventing.knative.dev/v1alpha1 +kind: Source +metadata: + name: love + namespace: default +spec: + provisioner: + ref: + name: heartbeats + arguments: + label: "<3" + period: 2 diff --git a/pkg/provisioners/heartbeats/config/examples/subscription.yaml b/pkg/provisioners/heartbeats/config/examples/subscription.yaml new file mode 100644 index 00000000000..d879d083e5a --- /dev/null +++ b/pkg/provisioners/heartbeats/config/examples/subscription.yaml @@ -0,0 +1,15 @@ +apiVersion: eventing.knative.dev/v1alpha1 +kind: Subscription +metadata: + name: love-dumper + namespace: default +spec: + from: + kind: Source + apiVersion: eventing.knative.dev/v1alpha1 + name: love + call: + target: + kind: Service + apiVersion: serving.knative.dev/v1alpha1 + name: message-dumper diff --git a/pkg/provisioners/heartbeats/config/heartbeat.yaml b/pkg/provisioners/heartbeats/config/heartbeat.yaml new file mode 100644 index 00000000000..5f9ca6f7fda --- /dev/null +++ b/pkg/provisioners/heartbeats/config/heartbeat.yaml @@ -0,0 +1,8 @@ +apiVersion: eventing.knative.dev/v1alpha1 +kind: ClusterProvisioner +metadata: + name: heartbeats +spec: + reconciles: + group: eventing.knative.dev + kind: Source diff --git a/pkg/provisioners/heartbeats/controller/provider.go b/pkg/provisioners/heartbeats/controller/provider.go new file mode 100644 index 00000000000..0629a030f53 --- /dev/null +++ b/pkg/provisioners/heartbeats/controller/provider.go @@ -0,0 +1,78 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +const ( + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "heartbeats-provisioner-controller" +) + +type reconciler struct { + client client.Client + restConfig *rest.Config + dynamicClient dynamic.Interface + recorder record.EventRecorder +} + +// Verify the struct implements reconcile.Reconciler +var _ reconcile.Reconciler = &reconciler{} + +// ProvideController returns a Subscription controller. +func ProvideController(mgr manager.Manager) (controller.Controller, error) { + // Setup a new controller to Reconcile Subscriptions. + c, err := controller.New(controllerAgentName, mgr, controller.Options{ + Reconciler: &reconciler{ + recorder: mgr.GetRecorder(controllerAgentName), + }, + }) + if err != nil { + return nil, err + } + + // Watch Source events and enqueue Source object key. + if err := c.Watch(&source.Kind{Type: &v1alpha1.Source{}}, &handler.EnqueueRequestForObject{}); err != nil { + return nil, err + } + + return c, nil +} + +func (r *reconciler) InjectClient(c client.Client) error { + r.client = c + return nil +} + +func (r *reconciler) InjectConfig(c *rest.Config) error { + r.restConfig = c + var err error + r.dynamicClient, err = dynamic.NewForConfig(c) + return err +} diff --git a/pkg/provisioners/heartbeats/controller/reconcile.go b/pkg/provisioners/heartbeats/controller/reconcile.go new file mode 100644 index 00000000000..bc650c94c80 --- /dev/null +++ b/pkg/provisioners/heartbeats/controller/reconcile.go @@ -0,0 +1,193 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "encoding/json" + "fmt" + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/provisioners/heartbeats/controller/resources" + "github.com/knative/pkg/logging" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +const ( + provisionerName = "heartbeats" +) + +// Reconcile compares the actual state with the desired, and attempts to +// converge the two. +func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { + ctx := context.TODO() + logger := logging.FromContext(ctx) + + logger.Infof("Reconciling source %v", request) + source := &v1alpha1.Source{} + err := r.client.Get(context.TODO(), request.NamespacedName, source) + + if errors.IsNotFound(err) { + logger.Errorf("could not find source %v\n", request) + return reconcile.Result{}, nil + } + + if err != nil { + logger.Errorf("could not fetch Source %v for %+v\n", err, request) + return reconcile.Result{}, err + } + + if source.Spec.Provisioner.Ref.Name != provisionerName { + logger.Errorf("heartbeats skipping source %s, provisioned by %s\n", source.Name, source.Spec.Provisioner.Ref.Name) + return reconcile.Result{}, nil + } + + original := source.DeepCopy() + + // Reconcile this copy of the Source and then write back any status + // updates regardless of whether the reconcile error out. + err = r.reconcile(ctx, source) + if err != nil { + logger.Warnf("Failed to reconcile source: %v", err) + } + if equality.Semantic.DeepEqual(original.Status, source.Status) { + // If we didn't change anything then don't call updateStatus. + // This is important because the copy we loaded from the informer's + // cache may be stale and we don't want to overwrite a prior update + // to status with this stale state. + } else if _, err := r.updateStatus(source); err != nil { + logger.Warnf("Failed to update source status: %v", err) + return reconcile.Result{}, err + } + + // Requeue if the resource is not ready: + return reconcile.Result{}, err +} + +func (r *reconciler) reconcile(ctx context.Context, source *v1alpha1.Source) error { + logger := logging.FromContext(ctx) + + source.Status.InitializeConditions() + + // TODO: + //// See if the source has been deleted + //accessor, err := meta.Accessor(source) + //if err != nil { + // logger.Warnf("Failed to get metadata accessor: %s", err) + // return err + //} + //deletionTimestamp := accessor.GetDeletionTimestamp() + + args := &resources.HeartBeatArguments{} + + if source.Spec.Arguments != nil { + if err := json.Unmarshal(source.Spec.Arguments.Raw, args); err != nil { + logger.Infof("Error: %s failed to unmarshal arguments, %v", source.Name, err) + } + } + args.Name = source.Name + args.Namespace = source.Namespace + + channel := &v1alpha1.Channel{} + if err := r.client.Get(ctx, client.ObjectKey{Namespace: args.Namespace, Name: args.Name}, channel); err != nil { + fqn := fmt.Sprintf("%s/%s", channel.Kind, channel.APIVersion) + if errors.IsNotFound(err) { + channel, err = r.createChannel(ctx, source, nil, args) + if err != nil { + return err + } + r.recorder.Eventf(source, corev1.EventTypeNormal, "Provisioned", "Created channel %q", channel.Name) + source.Status.SetProvisionedObjectState(channel.Name, fqn, "Created", "Created channel %q", channel.Name) + source.Status.MarkDeprovisioned("Provisioning", "Provisioning Channel %s", args.Name) + } else { + if channel.Status.IsReady() { + source.Status.SetProvisionedObjectState(channel.Name, fqn, "Ready", "") + } + } + } + + pod := &corev1.Pod{} + if err := r.client.Get(ctx, client.ObjectKey{Namespace: args.Namespace, Name: args.Name}, pod); err != nil { + fqn := fmt.Sprintf("%s/%s", pod.Kind, pod.APIVersion) + if errors.IsNotFound(err) { + pod, err = r.createPod(ctx, source, nil, channel, args) + if err != nil { + r.recorder.Eventf(source, corev1.EventTypeNormal, "Blocked", "waiting for %v", err) + source.Status.SetProvisionedObjectState(args.Name, fqn, "Blocked", "waiting for %v", args.Name, err) + return err + } + r.recorder.Eventf(source, corev1.EventTypeNormal, "Provisioned", "Created pod %q", pod.Name) + source.Status.SetProvisionedObjectState(pod.Name, fqn, "Created", "Created pod %q", pod.Name) + source.Status.MarkDeprovisioned("Provisioning", "Provisioning Pod %s", args.Name) + } else { + if pod.Status.Phase == corev1.PodRunning { + source.Status.SetProvisionedObjectState(pod.Name, fqn, "Ready", "") + source.Status.MarkProvisioned() + } + } + } + + // TODO: need informers for the object we are controlling + + return nil +} + +func (r *reconciler) updateStatus(source *v1alpha1.Source) (*v1alpha1.Source, error) { + newSource := &v1alpha1.Source{} + err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: source.Namespace, Name: source.Name}, newSource) + + if err != nil { + return nil, err + } + newSource.Status = source.Status + + // Until #38113 is merged, we must use Update instead of UpdateStatus to + // update the Status block of the Source resource. UpdateStatus will not + // allow changes to the Spec of the resource, which is ideal for ensuring + // nothing other than resource status has been updated. + if err = r.client.Update(context.TODO(), newSource); err != nil { + return nil, err + } + return newSource, nil +} + +func (r *reconciler) createChannel(ctx context.Context, source *v1alpha1.Source, org *v1alpha1.Channel, args *resources.HeartBeatArguments) (*v1alpha1.Channel, error) { + channel, err := resources.MakeChannel(source, org, args) + if err != nil { + return nil, err + } + + if err := r.client.Create(ctx, channel); err != nil { + return nil, err + } + return channel, nil +} + +func (r *reconciler) createPod(ctx context.Context, source *v1alpha1.Source, org *corev1.Pod, channel *v1alpha1.Channel, args *resources.HeartBeatArguments) (*corev1.Pod, error) { + pod, err := resources.MakePod(source, org, channel, args) + if err != nil { + return nil, err + } + + if err := r.client.Create(ctx, pod); err != nil { + return nil, err + } + return pod, nil +} diff --git a/pkg/provisioners/heartbeats/controller/resources/arguments.go b/pkg/provisioners/heartbeats/controller/resources/arguments.go new file mode 100644 index 00000000000..6ab3b5f6db6 --- /dev/null +++ b/pkg/provisioners/heartbeats/controller/resources/arguments.go @@ -0,0 +1,24 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +type HeartBeatArguments struct { + Name string `json:"-"` + Namespace string `json:"-"` + Label string `json:"label"` + Period int `json:"period"` +} diff --git a/pkg/provisioners/heartbeats/controller/resources/channel.go b/pkg/provisioners/heartbeats/controller/resources/channel.go new file mode 100644 index 00000000000..7ec2f11b1cc --- /dev/null +++ b/pkg/provisioners/heartbeats/controller/resources/channel.go @@ -0,0 +1,54 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/controller" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func MakeChannel(source *v1alpha1.Source, org *v1alpha1.Channel, args *HeartBeatArguments) (*v1alpha1.Channel, error) { + channel := &v1alpha1.Channel{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Channel", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: args.Name, + Namespace: args.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *controller.NewControllerRef(source, false), + }, + }, + Spec: v1alpha1.ChannelSpec{ + Provisioner: &v1alpha1.ProvisionerReference{ + Ref: &corev1.ObjectReference{ + Name: "in-memory-bus-provisioner", + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "ClusterProvisioner", + }, + }, + }, + } + if org != nil { + channel.Spec.Generation = org.Spec.Generation + } + return channel, nil +} diff --git a/pkg/provisioners/heartbeats/controller/resources/names.go b/pkg/provisioners/heartbeats/controller/resources/names.go new file mode 100644 index 00000000000..3b19493ddc8 --- /dev/null +++ b/pkg/provisioners/heartbeats/controller/resources/names.go @@ -0,0 +1,27 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +// ChannelName returns the name of the Channel for this source. +func ChannelName(args *HeartBeatArguments) string { + return args.Name +} + +// PodName returns the name of the Pod for this source. +func PodName(args *HeartBeatArguments) string { + return args.Name +} diff --git a/pkg/provisioners/heartbeats/controller/resources/pod.go b/pkg/provisioners/heartbeats/controller/resources/pod.go new file mode 100644 index 00000000000..9750b8c2ddf --- /dev/null +++ b/pkg/provisioners/heartbeats/controller/resources/pod.go @@ -0,0 +1,75 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/controller" + + "fmt" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + image = "gcr.io/plori-nicholss/cmd-f2e9e0e7e0a0f8a66b5b6f8d85f4c98b@sha256:76ae26763af3e8ce04f78a056de610ac6625b51fa9d6f72adb70788c9d3a9779" +) + +func MakePod(source *v1alpha1.Source, org *corev1.Pod, channel *v1alpha1.Channel, args *HeartBeatArguments) (*corev1.Pod, error) { + + if channel == nil || channel.Status.Sinkable.DomainInternal == "" { + return nil, fmt.Errorf("channel not ready") + } + + remote := fmt.Sprintf("--remote=http://%s", channel.Status.Sinkable.DomainInternal) + period := "" + if args.Period > 0 { + period = fmt.Sprintf("--period=%d", args.Period) + } + label := "" + if args.Label != "" { + period = fmt.Sprintf("--label=%s", args.Label) + } + + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: args.Name, + Namespace: args.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *controller.NewControllerRef(source, false), + }, + }, + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyOnFailure, + Containers: []corev1.Container{ + { + Name: "heartbeat", + Image: image, + Args: []string{ + remote, period, label, + }, + }, + }, + }, + } + return pod, nil +} From 9d150e70f694285f34891174884ca426db766073 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Wed, 10 Oct 2018 13:22:34 -0700 Subject: [PATCH 02/10] checkpoint. --- pkg/provisioners/heartbeats/controller/resources/channel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/provisioners/heartbeats/controller/resources/channel.go b/pkg/provisioners/heartbeats/controller/resources/channel.go index 7ec2f11b1cc..5428a242797 100644 --- a/pkg/provisioners/heartbeats/controller/resources/channel.go +++ b/pkg/provisioners/heartbeats/controller/resources/channel.go @@ -40,7 +40,7 @@ func MakeChannel(source *v1alpha1.Source, org *v1alpha1.Channel, args *HeartBeat Spec: v1alpha1.ChannelSpec{ Provisioner: &v1alpha1.ProvisionerReference{ Ref: &corev1.ObjectReference{ - Name: "in-memory-bus-provisioner", + Name: "in-memory-channel", APIVersion: "eventing.knative.dev/v1alpha1", Kind: "ClusterProvisioner", }, From db2a8280a0edf832d7f19455564221a77271578a Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Wed, 10 Oct 2018 15:09:22 -0700 Subject: [PATCH 03/10] worked --- pkg/provisioners/heartbeats/cmd/main.go | 12 ++++++------ .../heartbeats/config/examples/source.yaml | 2 +- pkg/provisioners/heartbeats/controller/reconcile.go | 6 ++++++ .../heartbeats/controller/resources/channel.go | 4 ++-- .../heartbeats/controller/resources/pod.go | 9 ++++++--- 5 files changed, 21 insertions(+), 12 deletions(-) diff --git a/pkg/provisioners/heartbeats/cmd/main.go b/pkg/provisioners/heartbeats/cmd/main.go index 6f8749157d8..e506c604c9f 100644 --- a/pkg/provisioners/heartbeats/cmd/main.go +++ b/pkg/provisioners/heartbeats/cmd/main.go @@ -11,7 +11,7 @@ import ( "time" ) -type heartbeat struct { +type Heartbeat struct { Sequence int `json:"id"` Label string `json:"label"` } @@ -21,7 +21,7 @@ var ( label string period int sequence int - hb *heartbeat + hb *Heartbeat ) func init() { @@ -33,7 +33,7 @@ func init() { func main() { flag.Parse() - hb = &heartbeat{ + hb = &Heartbeat{ Sequence: 0, Label: label, } @@ -45,10 +45,10 @@ func main() { } func send() { - sequence++ + hb.Sequence++ resp, err := http.Post(remote, "application/json", body()) if err != nil { - log.Printf("Unable to make request: %v, %+v", sequence, err) + log.Printf("Unable to make request: %+v, %v", hb, err) return } defer resp.Body.Close() @@ -56,7 +56,7 @@ func send() { func body() io.Reader { b, err := json.Marshal(hb) - if err == nil { + if err != nil { return strings.NewReader("{\"error\":\"true\"}") } return bytes.NewBuffer(b) diff --git a/pkg/provisioners/heartbeats/config/examples/source.yaml b/pkg/provisioners/heartbeats/config/examples/source.yaml index e5b29856fc9..41bf5e99982 100644 --- a/pkg/provisioners/heartbeats/config/examples/source.yaml +++ b/pkg/provisioners/heartbeats/config/examples/source.yaml @@ -8,5 +8,5 @@ spec: ref: name: heartbeats arguments: - label: "<3" + label: "grant hello" period: 2 diff --git a/pkg/provisioners/heartbeats/controller/reconcile.go b/pkg/provisioners/heartbeats/controller/reconcile.go index bc650c94c80..9cb59c99062 100644 --- a/pkg/provisioners/heartbeats/controller/reconcile.go +++ b/pkg/provisioners/heartbeats/controller/reconcile.go @@ -121,6 +121,12 @@ func (r *reconciler) reconcile(ctx context.Context, source *v1alpha1.Source) err source.Status.SetProvisionedObjectState(channel.Name, fqn, "Ready", "") } } + + source.Status.Subscribable.Channelable.Namespace = channel.Namespace + source.Status.Subscribable.Channelable.Name = channel.Name + source.Status.Subscribable.Channelable.APIVersion = "eventing.knative.dev/v1alpha1" + source.Status.Subscribable.Channelable.Kind = "Channel" + } pod := &corev1.Pod{} diff --git a/pkg/provisioners/heartbeats/controller/resources/channel.go b/pkg/provisioners/heartbeats/controller/resources/channel.go index 5428a242797..a0be782c0b1 100644 --- a/pkg/provisioners/heartbeats/controller/resources/channel.go +++ b/pkg/provisioners/heartbeats/controller/resources/channel.go @@ -31,8 +31,8 @@ func MakeChannel(source *v1alpha1.Source, org *v1alpha1.Channel, args *HeartBeat Kind: "Channel", }, ObjectMeta: metav1.ObjectMeta{ - Name: args.Name, - Namespace: args.Namespace, + GenerateName: args.Name, + Namespace: args.Namespace, OwnerReferences: []metav1.OwnerReference{ *controller.NewControllerRef(source, false), }, diff --git a/pkg/provisioners/heartbeats/controller/resources/pod.go b/pkg/provisioners/heartbeats/controller/resources/pod.go index 9750b8c2ddf..9ca5077cdbd 100644 --- a/pkg/provisioners/heartbeats/controller/resources/pod.go +++ b/pkg/provisioners/heartbeats/controller/resources/pod.go @@ -27,7 +27,7 @@ import ( ) const ( - image = "gcr.io/plori-nicholss/cmd-f2e9e0e7e0a0f8a66b5b6f8d85f4c98b@sha256:76ae26763af3e8ce04f78a056de610ac6625b51fa9d6f72adb70788c9d3a9779" + image = "gcr.io/plori-nicholss/cmd-f2e9e0e7e0a0f8a66b5b6f8d85f4c98b@sha256:d64b5fd7b4c8c5fdc70cca7732fcf384d3069a4a1762a69e58d701a040345aed" ) func MakePod(source *v1alpha1.Source, org *corev1.Pod, channel *v1alpha1.Channel, args *HeartBeatArguments) (*corev1.Pod, error) { @@ -52,11 +52,14 @@ func MakePod(source *v1alpha1.Source, org *corev1.Pod, channel *v1alpha1.Channel Kind: "Pod", }, ObjectMeta: metav1.ObjectMeta{ - Name: args.Name, - Namespace: args.Namespace, + GenerateName: args.Name, + Namespace: args.Namespace, OwnerReferences: []metav1.OwnerReference{ *controller.NewControllerRef(source, false), }, + Annotations: map[string]string{ + "sidecar.istio.io/inject": "true", + }, }, Spec: corev1.PodSpec{ RestartPolicy: corev1.RestartPolicyOnFailure, From 439a24ee4fbdc1ef530a194272f8b4a2950c7728 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Wed, 10 Oct 2018 15:41:47 -0700 Subject: [PATCH 04/10] love wins. --- .../heartbeats/controller/provider.go | 13 ++++ .../heartbeats/controller/reconcile.go | 78 +++++++++++++++++-- .../controller/resources/channel.go | 2 +- .../heartbeats/controller/resources/names.go | 27 ------- .../heartbeats/controller/resources/pod.go | 5 +- 5 files changed, 89 insertions(+), 36 deletions(-) delete mode 100644 pkg/provisioners/heartbeats/controller/resources/names.go diff --git a/pkg/provisioners/heartbeats/controller/provider.go b/pkg/provisioners/heartbeats/controller/provider.go index 0629a030f53..bf06b8b7552 100644 --- a/pkg/provisioners/heartbeats/controller/provider.go +++ b/pkg/provisioners/heartbeats/controller/provider.go @@ -18,6 +18,7 @@ package controller import ( "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + corev1 "k8s.io/api/core/v1" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" @@ -62,6 +63,18 @@ func ProvideController(mgr manager.Manager) (controller.Controller, error) { return nil, err } + // Watch Channels and enqueue owning Source key. + if err := c.Watch(&source.Kind{Type: &v1alpha1.Channel{}}, + &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.Source{}, IsController: true}); err != nil { + return nil, err + } + + // Watch Pods and enqueue owning Source key. + if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, + &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.Source{}, IsController: true}); err != nil { + return nil, err + } + return c, nil } diff --git a/pkg/provisioners/heartbeats/controller/reconcile.go b/pkg/provisioners/heartbeats/controller/reconcile.go index 9cb59c99062..0a4ca1eea7b 100644 --- a/pkg/provisioners/heartbeats/controller/reconcile.go +++ b/pkg/provisioners/heartbeats/controller/reconcile.go @@ -19,13 +19,15 @@ package controller import ( "context" "encoding/json" - "fmt" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/provisioners/heartbeats/controller/resources" "github.com/knative/pkg/logging" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" ) @@ -105,9 +107,9 @@ func (r *reconciler) reconcile(ctx context.Context, source *v1alpha1.Source) err args.Name = source.Name args.Namespace = source.Namespace - channel := &v1alpha1.Channel{} - if err := r.client.Get(ctx, client.ObjectKey{Namespace: args.Namespace, Name: args.Name}, channel); err != nil { - fqn := fmt.Sprintf("%s/%s", channel.Kind, channel.APIVersion) + channel, err := r.getChannel(ctx, source) + if err != nil { + fqn := "Channel.eventing.knative.dev/v1alpha1" if errors.IsNotFound(err) { channel, err = r.createChannel(ctx, source, nil, args) if err != nil { @@ -129,9 +131,9 @@ func (r *reconciler) reconcile(ctx context.Context, source *v1alpha1.Source) err } - pod := &corev1.Pod{} - if err := r.client.Get(ctx, client.ObjectKey{Namespace: args.Namespace, Name: args.Name}, pod); err != nil { - fqn := fmt.Sprintf("%s/%s", pod.Kind, pod.APIVersion) + pod, err := r.getPod(ctx, source) + if err != nil { + fqn := "Pod.core/v1" if errors.IsNotFound(err) { pod, err = r.createPod(ctx, source, nil, channel, args) if err != nil { @@ -174,6 +176,37 @@ func (r *reconciler) updateStatus(source *v1alpha1.Source) (*v1alpha1.Source, er return newSource, nil } +func (r *reconciler) getChannel(ctx context.Context, source *v1alpha1.Source) (*v1alpha1.Channel, error) { + logger := logging.FromContext(ctx) + + list := &v1alpha1.ChannelList{} + err := r.client.List( + ctx, + &client.ListOptions{ + Namespace: source.Namespace, + LabelSelector: labels.Everything(), + // TODO this is here because the fake client needs it. Remove this when it's no longer + // needed. + Raw: &metav1.ListOptions{ + TypeMeta: metav1.TypeMeta{ + APIVersion: v1alpha1.SchemeGroupVersion.String(), + Kind: "Channel", + }, + }, + }, + list) + if err != nil { + logger.Errorf("Unable to list channels: %v", err) + return nil, err + } + for _, c := range list.Items { + if metav1.IsControlledBy(&c, source) { + return &c, nil + } + } + return nil, errors.NewNotFound(schema.GroupResource{}, "") +} + func (r *reconciler) createChannel(ctx context.Context, source *v1alpha1.Source, org *v1alpha1.Channel, args *resources.HeartBeatArguments) (*v1alpha1.Channel, error) { channel, err := resources.MakeChannel(source, org, args) if err != nil { @@ -186,6 +219,37 @@ func (r *reconciler) createChannel(ctx context.Context, source *v1alpha1.Source, return channel, nil } +func (r *reconciler) getPod(ctx context.Context, source *v1alpha1.Source) (*corev1.Pod, error) { + logger := logging.FromContext(ctx) + + list := &corev1.PodList{} + err := r.client.List( + ctx, + &client.ListOptions{ + Namespace: source.Namespace, + LabelSelector: labels.Everything(), + // TODO this is here because the fake client needs it. Remove this when it's no longer + // needed. + Raw: &metav1.ListOptions{ + TypeMeta: metav1.TypeMeta{ + APIVersion: corev1.SchemeGroupVersion.String(), + Kind: "Pod", + }, + }, + }, + list) + if err != nil { + logger.Errorf("Unable to list pods: %v", err) + return nil, err + } + for _, c := range list.Items { + if metav1.IsControlledBy(&c, source) { + return &c, nil + } + } + return nil, errors.NewNotFound(schema.GroupResource{}, "") +} + func (r *reconciler) createPod(ctx context.Context, source *v1alpha1.Source, org *corev1.Pod, channel *v1alpha1.Channel, args *resources.HeartBeatArguments) (*corev1.Pod, error) { pod, err := resources.MakePod(source, org, channel, args) if err != nil { diff --git a/pkg/provisioners/heartbeats/controller/resources/channel.go b/pkg/provisioners/heartbeats/controller/resources/channel.go index a0be782c0b1..b92d34420d0 100644 --- a/pkg/provisioners/heartbeats/controller/resources/channel.go +++ b/pkg/provisioners/heartbeats/controller/resources/channel.go @@ -31,7 +31,7 @@ func MakeChannel(source *v1alpha1.Source, org *v1alpha1.Channel, args *HeartBeat Kind: "Channel", }, ObjectMeta: metav1.ObjectMeta{ - GenerateName: args.Name, + GenerateName: args.Name + "-", Namespace: args.Namespace, OwnerReferences: []metav1.OwnerReference{ *controller.NewControllerRef(source, false), diff --git a/pkg/provisioners/heartbeats/controller/resources/names.go b/pkg/provisioners/heartbeats/controller/resources/names.go deleted file mode 100644 index 3b19493ddc8..00000000000 --- a/pkg/provisioners/heartbeats/controller/resources/names.go +++ /dev/null @@ -1,27 +0,0 @@ -/* -Copyright 2018 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package resources - -// ChannelName returns the name of the Channel for this source. -func ChannelName(args *HeartBeatArguments) string { - return args.Name -} - -// PodName returns the name of the Pod for this source. -func PodName(args *HeartBeatArguments) string { - return args.Name -} diff --git a/pkg/provisioners/heartbeats/controller/resources/pod.go b/pkg/provisioners/heartbeats/controller/resources/pod.go index 9ca5077cdbd..ccbe108fcfa 100644 --- a/pkg/provisioners/heartbeats/controller/resources/pod.go +++ b/pkg/provisioners/heartbeats/controller/resources/pod.go @@ -52,7 +52,7 @@ func MakePod(source *v1alpha1.Source, org *corev1.Pod, channel *v1alpha1.Channel Kind: "Pod", }, ObjectMeta: metav1.ObjectMeta{ - GenerateName: args.Name, + GenerateName: args.Name + "-", Namespace: args.Namespace, OwnerReferences: []metav1.OwnerReference{ *controller.NewControllerRef(source, false), @@ -60,6 +60,9 @@ func MakePod(source *v1alpha1.Source, org *corev1.Pod, channel *v1alpha1.Channel Annotations: map[string]string{ "sidecar.istio.io/inject": "true", }, + Labels: map[string]string{ + //"provisonereventing.knative.dev" + }, }, Spec: corev1.PodSpec{ RestartPolicy: corev1.RestartPolicyOnFailure, From 1f7d23146814a86a63040447e883abf8cb5f19c3 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Wed, 10 Oct 2018 15:49:47 -0700 Subject: [PATCH 05/10] heart. --- pkg/provisioners/heartbeats/config/examples/source.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/provisioners/heartbeats/config/examples/source.yaml b/pkg/provisioners/heartbeats/config/examples/source.yaml index 41bf5e99982..e5b29856fc9 100644 --- a/pkg/provisioners/heartbeats/config/examples/source.yaml +++ b/pkg/provisioners/heartbeats/config/examples/source.yaml @@ -8,5 +8,5 @@ spec: ref: name: heartbeats arguments: - label: "grant hello" + label: "<3" period: 2 From c43236a748634d5aa469ce16f537c0d5a6f35851 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Thu, 11 Oct 2018 10:43:11 -0700 Subject: [PATCH 06/10] ambicontroller. --- pkg/provisioners/ambicontroller/interface.go | 37 ++++ pkg/provisioners/ambicontroller/provider.go | 136 ++++++++++++ pkg/provisioners/meta/heartbeats/handler.go | 222 +++++++++++++++++++ pkg/provisioners/meta/sdk/controller.go | 120 ++++++++++ 4 files changed, 515 insertions(+) create mode 100644 pkg/provisioners/ambicontroller/interface.go create mode 100644 pkg/provisioners/ambicontroller/provider.go create mode 100644 pkg/provisioners/meta/heartbeats/handler.go create mode 100644 pkg/provisioners/meta/sdk/controller.go diff --git a/pkg/provisioners/ambicontroller/interface.go b/pkg/provisioners/ambicontroller/interface.go new file mode 100644 index 00000000000..e9f4219cde6 --- /dev/null +++ b/pkg/provisioners/ambicontroller/interface.go @@ -0,0 +1,37 @@ +package sdk + +import ( + "context" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +type Reconciler interface { + // takes a byte[] and VersionKind + Convert(context.Context, schema.GroupVersionKind, []byte) (runtime.Object, error) + // Takes in the parent and children, returns the parent status and requested children state. + Reconcile(context.Context, runtime.Object, []runtime.Object) (interface{}, []runtime.Object, error) +} + +type Controller interface { + Handle(context.Context, *Request) (*Response, error) +} + +type Request struct { + // Controller is the overall context for the reconciler. + Controller unstructured.Unstructured `json:"controller"` + // Parent is the object under reconciliation. + Parent unstructured.Unstructured `json:"parent"` + // Children is a map where the key is gvk and the value is a ChildrenGroup + Children map[string]ChildrenGroup `json:"children"` + Finalizing bool `json:"finalizing"` +} + +// ChildrenGroup is a map of resource names to a resource. +type ChildrenGroup map[string]unstructured.Unstructured + +type Response struct { + Status unstructured.Unstructured `json:"status"` + Children []unstructured.Unstructured `json:"children"` +} diff --git a/pkg/provisioners/ambicontroller/provider.go b/pkg/provisioners/ambicontroller/provider.go new file mode 100644 index 00000000000..c14d635f98c --- /dev/null +++ b/pkg/provisioners/ambicontroller/provider.go @@ -0,0 +1,136 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +const ( + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "heartbeats-provisioner-controller" +) + +type reconciler struct { + client client.Client + restConfig *rest.Config + dynamicClient dynamic.Interface + recorder record.EventRecorder +} + +// Verify the struct implements reconcile.Reconciler +var _ reconcile.Reconciler = &reconciler{} + +// ProvideController returns a Subscription controller. +func ProvideController(mgr manager.Manager) (controller.Controller, error) { + // Setup a new controller to Reconcile Subscriptions. + c, err := controller.New(controllerAgentName, mgr, controller.Options{ + Reconciler: &reconciler{ + recorder: mgr.GetRecorder(controllerAgentName), + }, + }) + if err != nil { + return nil, err + } + + // Watch Source events and enqueue Source object key. + if err := c.Watch(&source.Kind{Type: &v1alpha1.Source{}}, &handler.EnqueueRequestForObject{}); err != nil { + return nil, err + } + + // Watch Channels and enqueue owning Source key. + if err := c.Watch(&source.Kind{Type: &v1alpha1.Channel{}}, + &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.Source{}, IsController: true}); err != nil { + return nil, err + } + + // Watch Pods and enqueue owning Source key. + if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, + &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.Source{}, IsController: true}); err != nil { + return nil, err + } + + return c, nil +} + +func (r *reconciler) InjectClient(c client.Client) error { + r.client = c + return nil +} + +func (r *reconciler) InjectConfig(c *rest.Config) error { + r.restConfig = c + var err error + r.dynamicClient, err = dynamic.NewForConfig(c) + return err +} + +func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { + ctx := context.TODO() + logger := logging.FromContext(ctx) + + logger.Infof("Reconciling source %v", request) + source := &v1alpha1.Source{} + err := r.client.Get(context.TODO(), request.NamespacedName, source) + + if errors.IsNotFound(err) { + logger.Errorf("could not find source %v\n", request) + return reconcile.Result{}, nil + } + + if err != nil { + logger.Errorf("could not fetch Source %v for %+v\n", err, request) + return reconcile.Result{}, err + } + + if source.Spec.Provisioner.Ref.Name != provisionerName { + logger.Errorf("heartbeats skipping source %s, provisioned by %s\n", source.Name, source.Spec.Provisioner.Ref.Name) + return reconcile.Result{}, nil + } + + original := source.DeepCopy() + + // Reconcile this copy of the Source and then write back any status + // updates regardless of whether the reconcile error out. + err = r.reconcile(ctx, source) + if err != nil { + logger.Warnf("Failed to reconcile source: %v", err) + } + if equality.Semantic.DeepEqual(original.Status, source.Status) { + // If we didn't change anything then don't call updateStatus. + // This is important because the copy we loaded from the informer's + // cache may be stale and we don't want to overwrite a prior update + // to status with this stale state. + } else if _, err := r.updateStatus(source); err != nil { + logger.Warnf("Failed to update source status: %v", err) + return reconcile.Result{}, err + } + + // Requeue if the resource is not ready: + return reconcile.Result{}, err +} diff --git a/pkg/provisioners/meta/heartbeats/handler.go b/pkg/provisioners/meta/heartbeats/handler.go new file mode 100644 index 00000000000..08c254ee7c8 --- /dev/null +++ b/pkg/provisioners/meta/heartbeats/handler.go @@ -0,0 +1,222 @@ +package heartbeats + +import ( + "fmt" + "reflect" +) + +/* + +Depended on + +apiVersion: metacontroller.k8s.io/v1alpha1 +kind: CompositeController +metadata: + name: heartbeats-controller +spec: + generateSelector: true + parentResource: + apiVersion: eventing.knative.dev/v1alpha1 + resource: sources + childResources: + - apiVersion: eventing.knative.dev/v1alpha1 + resource: channels + - apiVersion: v1 + resource: pods + hooks: + sync: + webhook: + url: http://heartbeats-provisioner.default/ + timeout: 30s + + +*/ + +const ( + //image = "github.com/n3wscott/metacontroller-go-sdk/cmd/heartbeats" + image = "gcr.io/plori-nicholss/heartbeats-83786fa49704c68848513f3f5ee000df@sha256:9be8ba27e2daa4df91fdfe3006a1a99c022663a031d597adbe019b45fca28c77" +) + +type HeartBeatArguments struct { + Name string `json:"name"` + Label string `json:"label"` + Period int `json:"period"` +} + +type handler struct{} + +func NewHandler() sdk.Handler { + return &handler{} +} + +func (h *handler) Convert(ctx context.Context, versionKind string, body []byte) (runtime.Object, error) { + log.Printf("Converting %s", versionKind) + if versionKind == "Channel.eventing.knative.dev/v1alpha1" { + channel := &eventingv1alpha1.Channel{} + if err := json.Unmarshal(body, channel); err != nil { + return nil, err + } + return channel, nil + } + if versionKind == "Pod.v1" { + pod := &corev1.Pod{} + if err := json.Unmarshal(body, pod); err != nil { + return nil, err + } + return pod, nil + } + return nil, fmt.Errorf("unknown verison kind: %s", versionKind) +} + +func (h *handler) Handle(ctx context.Context, source eventingv1alpha1.Source, c []runtime.Object) (*eventingv1alpha1.SourceStatus, []runtime.Object, error) { + + log.Printf("Starting Handle for: %s", source.Name) + + args := &HeartBeatArguments{} + + if source.Spec.Arguments != nil { + if err := json.Unmarshal(source.Spec.Arguments.Raw, args); err != nil { + log.Printf("Error: %s failed to unmarshal arguments, %v", source.Name, err) + } + } + args.Name = source.Name + + var orgChannel *eventingv1alpha1.Channel + var orgPod *corev1.Pod + //c[0].GetObjectKind().GroupVersionKind().Kind <-- todo could do it this way. + for _, obj := range c { + log.Printf("obj is a kind %s", reflect.TypeOf(obj)) + if orgObj, ok := obj.(*eventingv1alpha1.Channel); ok { + orgChannel = orgObj + } + if orgObj, ok := obj.(*corev1.Pod); ok { + orgPod = orgObj + } + } + + channel := makeChannel(orgChannel, args) + pod := makePod(orgPod, orgChannel, args) + + status := updateStatus(source.Status, orgChannel, channel, orgPod, pod) + + var children []runtime.Object + if channel != nil { + children = append(children, channel) + } + if pod != nil { + children = append(children, pod) + } + + return status, children, nil +} + +func updateStatus(org eventingv1alpha1.SourceStatus, orgChan, channel *eventingv1alpha1.Channel, orgPod, pod *corev1.Pod) *eventingv1alpha1.SourceStatus { + status := org.DeepCopy() + + if orgChan != nil && orgPod != nil { + c := orgChan.Status.GetCondition(eventingv1alpha1.ChannelConditionReady) + if c.IsTrue() && orgPod != nil { // TODO:: could look at pod status. + status.MarkProvisioned() + return status + } + } + + cM := "" + pM := "" + if orgChan == nil { + if channel == nil { + cM = "Channel nil" + } else { + cM = "Channel created" + } + } else { + c := orgChan.Status.GetCondition(eventingv1alpha1.ChannelConditionReady) + cM = fmt.Sprintf("Channel.isReady: %v", c.IsTrue()) + } + + if orgPod == nil { + if pod == nil { + pM = "Pod nil" + } else { + pM = "Pod created" + } + } else { + pM = fmt.Sprintf("Pod.isReady: %v", orgPod != nil) + } + status.MarkDeprovisioned("Provisioning", "%s; %s", cM, pM) + return status +} + +func makeChannel(org *eventingv1alpha1.Channel, args *HeartBeatArguments) *eventingv1alpha1.Channel { + channel := &eventingv1alpha1.Channel{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Channel", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: args.Name + "-chan", + }, + Spec: eventingv1alpha1.ChannelSpec{ + Provisioner: &eventingv1alpha1.ProvisionerReference{ + Ref: &corev1.ObjectReference{ + Name: "in-memory-bus-provisioner", + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "ClusterProvisioner", + }, + }, + Channelable: &duckv1alpha1.Channelable{ + Subscribers: []duckv1alpha1.ChannelSubscriberSpec{{ + SinkableDomain: "message-dumper.default.svc.cluster.local", + }}, + }, + }, + } + if org != nil { + channel.Spec.Generation = org.Spec.Generation + } + return channel +} + +func makePod(org *corev1.Pod, channel *eventingv1alpha1.Channel, args *HeartBeatArguments) *corev1.Pod { + + if channel == nil || channel.Status.Sinkable.DomainInternal == "" { + log.Printf("channel: %v", channel) + return nil + } else { + log.Printf("channel: %v", channel) + } + + remote := fmt.Sprintf("--remote=http://%s", channel.Status.Sinkable.DomainInternal) + period := "" + if args.Period > 0 { + period = fmt.Sprintf("--period=%d", args.Period) + } + label := "" + if args.Label != "" { + period = fmt.Sprintf("--label=%s", args.Label) + } + + pod := &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: args.Name + "-er", + }, + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyOnFailure, + Containers: []corev1.Container{ + { + Name: "heartbeat", + Image: image, + Args: []string{ + remote, period, label, + }, + }, + }, + }, + } + + return pod +} diff --git a/pkg/provisioners/meta/sdk/controller.go b/pkg/provisioners/meta/sdk/controller.go new file mode 100644 index 00000000000..80c6f46c781 --- /dev/null +++ b/pkg/provisioners/meta/sdk/controller.go @@ -0,0 +1,120 @@ +package sdk + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "io/ioutil" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "log" + "net/http" + + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" +) + +var ( + port int + ctx context.Context + handler Handler +) + +type Handler interface { + Convert(context.Context, string, []byte) (runtime.Object, error) + Handle(context.Context, eventingv1alpha1.Source, []runtime.Object) (*eventingv1alpha1.SourceStatus, []runtime.Object, error) +} + +type SyncRequest struct { + Controller *runtime.RawExtension `json:"controller"` + Parent eventingv1alpha1.Source `json:"parent"` + Children map[string]ChildrenGroup `json:"children"` // TODO not sure how to deal with children + Finalizing bool `json:"finalizing"` +} + +type ChildrenGroup map[string]unstructured.Unstructured + +type SyncResponse struct { + Status eventingv1alpha1.SourceStatus `json:"status"` + Children []runtime.Object `json:"children"` +} + +func Handle(h Handler) { + handler = h +} + +func httpHandler(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + + log.Printf("Sync: %s", string(body)) + + if err != nil { + log.Printf("Error: failed to read all body, %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + request := &SyncRequest{} + if err := json.Unmarshal(body, request); err != nil { + log.Printf("Error: failed to unmarshal request, %v", err) + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + children := make([]runtime.Object, 0, len(request.Children)) // this is wrong, .children is the size of the map, not the objects. But it will scale up + + for k, cg := range request.Children { + for _, child := range cg { + cjson, err := child.MarshalJSON() + if err != nil { + log.Printf("Error: failed to marshal json from child %v, %v", child, err) + continue + } + newChild, err := handler.Convert(ctx, k, cjson) + if err != nil { + log.Printf("Error: failed to convert child json %q to runtime object, %v", string(cjson), err) + continue + } + log.Printf("converted a child: %v", newChild) + children = append(children, newChild) + } + } + + status, newChildren, err := handler.Handle(ctx, request.Parent, children) // TODO convert the children to a flat list. + if err != nil { + log.Printf("Error: failed to handle request, %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + body, err = json.Marshal(&SyncResponse{ + Status: *status, + Children: newChildren, + }) + if err != nil { + log.Printf("Error: failed to marshel response, %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + w.Write(body) +} + +func Run(c context.Context) { + flag.Parse() + + ctx = c + http.HandleFunc("/", httpHandler) + + server := &http.Server{Addr: fmt.Sprintf(":%d", port)} + go func() { + log.Fatal(server.ListenAndServe()) + }() + + // Shutdown on SIGTERM. + sig := <-ctx.Done() + log.Printf("Received %v signal. Shutting down...", sig) + server.Shutdown(ctx) +} + +func init() { + flag.IntVar(&port, "port", 80, "The port to listen on") +} From 518b0df0f15dde00ebbb939b63b880c88147a8f0 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Fri, 12 Oct 2018 08:39:38 -0700 Subject: [PATCH 07/10] Looking at abstracting some of the reconciler out. Not quite there yet. --- pkg/provisioners/ambicontroller/interface.go | 37 --- pkg/provisioners/ambicontroller/provider.go | 136 --------- .../heartbeats/controller/provider.go | 36 +-- .../heartbeats/controller/reconcile.go | 75 +---- pkg/provisioners/meta/heartbeats/handler.go | 222 -------------- pkg/provisioners/meta/sdk/controller.go | 120 -------- pkg/provisioners/sdk/provider.go | 282 ++++++++++++++++++ 7 files changed, 300 insertions(+), 608 deletions(-) delete mode 100644 pkg/provisioners/ambicontroller/interface.go delete mode 100644 pkg/provisioners/ambicontroller/provider.go delete mode 100644 pkg/provisioners/meta/heartbeats/handler.go delete mode 100644 pkg/provisioners/meta/sdk/controller.go create mode 100644 pkg/provisioners/sdk/provider.go diff --git a/pkg/provisioners/ambicontroller/interface.go b/pkg/provisioners/ambicontroller/interface.go deleted file mode 100644 index e9f4219cde6..00000000000 --- a/pkg/provisioners/ambicontroller/interface.go +++ /dev/null @@ -1,37 +0,0 @@ -package sdk - -import ( - "context" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" -) - -type Reconciler interface { - // takes a byte[] and VersionKind - Convert(context.Context, schema.GroupVersionKind, []byte) (runtime.Object, error) - // Takes in the parent and children, returns the parent status and requested children state. - Reconcile(context.Context, runtime.Object, []runtime.Object) (interface{}, []runtime.Object, error) -} - -type Controller interface { - Handle(context.Context, *Request) (*Response, error) -} - -type Request struct { - // Controller is the overall context for the reconciler. - Controller unstructured.Unstructured `json:"controller"` - // Parent is the object under reconciliation. - Parent unstructured.Unstructured `json:"parent"` - // Children is a map where the key is gvk and the value is a ChildrenGroup - Children map[string]ChildrenGroup `json:"children"` - Finalizing bool `json:"finalizing"` -} - -// ChildrenGroup is a map of resource names to a resource. -type ChildrenGroup map[string]unstructured.Unstructured - -type Response struct { - Status unstructured.Unstructured `json:"status"` - Children []unstructured.Unstructured `json:"children"` -} diff --git a/pkg/provisioners/ambicontroller/provider.go b/pkg/provisioners/ambicontroller/provider.go deleted file mode 100644 index c14d635f98c..00000000000 --- a/pkg/provisioners/ambicontroller/provider.go +++ /dev/null @@ -1,136 +0,0 @@ -/* -Copyright 2018 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - corev1 "k8s.io/api/core/v1" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/record" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" -) - -const ( - // controllerAgentName is the string used by this controller to identify - // itself when creating events. - controllerAgentName = "heartbeats-provisioner-controller" -) - -type reconciler struct { - client client.Client - restConfig *rest.Config - dynamicClient dynamic.Interface - recorder record.EventRecorder -} - -// Verify the struct implements reconcile.Reconciler -var _ reconcile.Reconciler = &reconciler{} - -// ProvideController returns a Subscription controller. -func ProvideController(mgr manager.Manager) (controller.Controller, error) { - // Setup a new controller to Reconcile Subscriptions. - c, err := controller.New(controllerAgentName, mgr, controller.Options{ - Reconciler: &reconciler{ - recorder: mgr.GetRecorder(controllerAgentName), - }, - }) - if err != nil { - return nil, err - } - - // Watch Source events and enqueue Source object key. - if err := c.Watch(&source.Kind{Type: &v1alpha1.Source{}}, &handler.EnqueueRequestForObject{}); err != nil { - return nil, err - } - - // Watch Channels and enqueue owning Source key. - if err := c.Watch(&source.Kind{Type: &v1alpha1.Channel{}}, - &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.Source{}, IsController: true}); err != nil { - return nil, err - } - - // Watch Pods and enqueue owning Source key. - if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, - &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.Source{}, IsController: true}); err != nil { - return nil, err - } - - return c, nil -} - -func (r *reconciler) InjectClient(c client.Client) error { - r.client = c - return nil -} - -func (r *reconciler) InjectConfig(c *rest.Config) error { - r.restConfig = c - var err error - r.dynamicClient, err = dynamic.NewForConfig(c) - return err -} - -func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { - ctx := context.TODO() - logger := logging.FromContext(ctx) - - logger.Infof("Reconciling source %v", request) - source := &v1alpha1.Source{} - err := r.client.Get(context.TODO(), request.NamespacedName, source) - - if errors.IsNotFound(err) { - logger.Errorf("could not find source %v\n", request) - return reconcile.Result{}, nil - } - - if err != nil { - logger.Errorf("could not fetch Source %v for %+v\n", err, request) - return reconcile.Result{}, err - } - - if source.Spec.Provisioner.Ref.Name != provisionerName { - logger.Errorf("heartbeats skipping source %s, provisioned by %s\n", source.Name, source.Spec.Provisioner.Ref.Name) - return reconcile.Result{}, nil - } - - original := source.DeepCopy() - - // Reconcile this copy of the Source and then write back any status - // updates regardless of whether the reconcile error out. - err = r.reconcile(ctx, source) - if err != nil { - logger.Warnf("Failed to reconcile source: %v", err) - } - if equality.Semantic.DeepEqual(original.Status, source.Status) { - // If we didn't change anything then don't call updateStatus. - // This is important because the copy we loaded from the informer's - // cache may be stale and we don't want to overwrite a prior update - // to status with this stale state. - } else if _, err := r.updateStatus(source); err != nil { - logger.Warnf("Failed to update source status: %v", err) - return reconcile.Result{}, err - } - - // Requeue if the resource is not ready: - return reconcile.Result{}, err -} diff --git a/pkg/provisioners/heartbeats/controller/provider.go b/pkg/provisioners/heartbeats/controller/provider.go index bf06b8b7552..c572951c379 100644 --- a/pkg/provisioners/heartbeats/controller/provider.go +++ b/pkg/provisioners/heartbeats/controller/provider.go @@ -18,16 +18,15 @@ package controller import ( "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/provisioners/sdk" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/reconcile" - "sigs.k8s.io/controller-runtime/pkg/source" ) const ( @@ -43,39 +42,18 @@ type reconciler struct { recorder record.EventRecorder } -// Verify the struct implements reconcile.Reconciler -var _ reconcile.Reconciler = &reconciler{} - // ProvideController returns a Subscription controller. func ProvideController(mgr manager.Manager) (controller.Controller, error) { - // Setup a new controller to Reconcile Subscriptions. - c, err := controller.New(controllerAgentName, mgr, controller.Options{ + p := &sdk.Provider{ + AgentName: controllerAgentName, + Parent: &v1alpha1.Source{}, + Owns: []runtime.Object{&corev1.Pod{}, &v1alpha1.Channel{}}, Reconciler: &reconciler{ recorder: mgr.GetRecorder(controllerAgentName), }, - }) - if err != nil { - return nil, err - } - - // Watch Source events and enqueue Source object key. - if err := c.Watch(&source.Kind{Type: &v1alpha1.Source{}}, &handler.EnqueueRequestForObject{}); err != nil { - return nil, err - } - - // Watch Channels and enqueue owning Source key. - if err := c.Watch(&source.Kind{Type: &v1alpha1.Channel{}}, - &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.Source{}, IsController: true}); err != nil { - return nil, err - } - - // Watch Pods and enqueue owning Source key. - if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, - &handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.Source{}, IsController: true}); err != nil { - return nil, err } - return c, nil + return p.ProvideController(mgr) } func (r *reconciler) InjectClient(c client.Client) error { diff --git a/pkg/provisioners/heartbeats/controller/reconcile.go b/pkg/provisioners/heartbeats/controller/reconcile.go index 0a4ca1eea7b..131311f749b 100644 --- a/pkg/provisioners/heartbeats/controller/reconcile.go +++ b/pkg/provisioners/heartbeats/controller/reconcile.go @@ -23,13 +23,12 @@ import ( "github.com/knative/eventing/pkg/provisioners/heartbeats/controller/resources" "github.com/knative/pkg/logging" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/reconcile" ) const ( @@ -38,53 +37,20 @@ const ( // Reconcile compares the actual state with the desired, and attempts to // converge the two. -func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { - ctx := context.TODO() - logger := logging.FromContext(ctx) - - logger.Infof("Reconciling source %v", request) - source := &v1alpha1.Source{} - err := r.client.Get(context.TODO(), request.NamespacedName, source) - if errors.IsNotFound(err) { - logger.Errorf("could not find source %v\n", request) - return reconcile.Result{}, nil - } +func (r *reconciler) Reconcile(ctx context.Context, object runtime.Object) (runtime.Object, error) { + logger := logging.FromContext(ctx) - if err != nil { - logger.Errorf("could not fetch Source %v for %+v\n", err, request) - return reconcile.Result{}, err + source, ok := object.(*v1alpha1.Source) + if !ok { + logger.Errorf("could not find source %v\n", object) + return object, nil } if source.Spec.Provisioner.Ref.Name != provisionerName { logger.Errorf("heartbeats skipping source %s, provisioned by %s\n", source.Name, source.Spec.Provisioner.Ref.Name) - return reconcile.Result{}, nil - } - - original := source.DeepCopy() - - // Reconcile this copy of the Source and then write back any status - // updates regardless of whether the reconcile error out. - err = r.reconcile(ctx, source) - if err != nil { - logger.Warnf("Failed to reconcile source: %v", err) + return source, nil } - if equality.Semantic.DeepEqual(original.Status, source.Status) { - // If we didn't change anything then don't call updateStatus. - // This is important because the copy we loaded from the informer's - // cache may be stale and we don't want to overwrite a prior update - // to status with this stale state. - } else if _, err := r.updateStatus(source); err != nil { - logger.Warnf("Failed to update source status: %v", err) - return reconcile.Result{}, err - } - - // Requeue if the resource is not ready: - return reconcile.Result{}, err -} - -func (r *reconciler) reconcile(ctx context.Context, source *v1alpha1.Source) error { - logger := logging.FromContext(ctx) source.Status.InitializeConditions() @@ -113,7 +79,7 @@ func (r *reconciler) reconcile(ctx context.Context, source *v1alpha1.Source) err if errors.IsNotFound(err) { channel, err = r.createChannel(ctx, source, nil, args) if err != nil { - return err + return object, err } r.recorder.Eventf(source, corev1.EventTypeNormal, "Provisioned", "Created channel %q", channel.Name) source.Status.SetProvisionedObjectState(channel.Name, fqn, "Created", "Created channel %q", channel.Name) @@ -139,7 +105,7 @@ func (r *reconciler) reconcile(ctx context.Context, source *v1alpha1.Source) err if err != nil { r.recorder.Eventf(source, corev1.EventTypeNormal, "Blocked", "waiting for %v", err) source.Status.SetProvisionedObjectState(args.Name, fqn, "Blocked", "waiting for %v", args.Name, err) - return err + return object, err } r.recorder.Eventf(source, corev1.EventTypeNormal, "Provisioned", "Created pod %q", pod.Name) source.Status.SetProvisionedObjectState(pod.Name, fqn, "Created", "Created pod %q", pod.Name) @@ -154,26 +120,7 @@ func (r *reconciler) reconcile(ctx context.Context, source *v1alpha1.Source) err // TODO: need informers for the object we are controlling - return nil -} - -func (r *reconciler) updateStatus(source *v1alpha1.Source) (*v1alpha1.Source, error) { - newSource := &v1alpha1.Source{} - err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: source.Namespace, Name: source.Name}, newSource) - - if err != nil { - return nil, err - } - newSource.Status = source.Status - - // Until #38113 is merged, we must use Update instead of UpdateStatus to - // update the Status block of the Source resource. UpdateStatus will not - // allow changes to the Spec of the resource, which is ideal for ensuring - // nothing other than resource status has been updated. - if err = r.client.Update(context.TODO(), newSource); err != nil { - return nil, err - } - return newSource, nil + return source, nil } func (r *reconciler) getChannel(ctx context.Context, source *v1alpha1.Source) (*v1alpha1.Channel, error) { diff --git a/pkg/provisioners/meta/heartbeats/handler.go b/pkg/provisioners/meta/heartbeats/handler.go deleted file mode 100644 index 08c254ee7c8..00000000000 --- a/pkg/provisioners/meta/heartbeats/handler.go +++ /dev/null @@ -1,222 +0,0 @@ -package heartbeats - -import ( - "fmt" - "reflect" -) - -/* - -Depended on - -apiVersion: metacontroller.k8s.io/v1alpha1 -kind: CompositeController -metadata: - name: heartbeats-controller -spec: - generateSelector: true - parentResource: - apiVersion: eventing.knative.dev/v1alpha1 - resource: sources - childResources: - - apiVersion: eventing.knative.dev/v1alpha1 - resource: channels - - apiVersion: v1 - resource: pods - hooks: - sync: - webhook: - url: http://heartbeats-provisioner.default/ - timeout: 30s - - -*/ - -const ( - //image = "github.com/n3wscott/metacontroller-go-sdk/cmd/heartbeats" - image = "gcr.io/plori-nicholss/heartbeats-83786fa49704c68848513f3f5ee000df@sha256:9be8ba27e2daa4df91fdfe3006a1a99c022663a031d597adbe019b45fca28c77" -) - -type HeartBeatArguments struct { - Name string `json:"name"` - Label string `json:"label"` - Period int `json:"period"` -} - -type handler struct{} - -func NewHandler() sdk.Handler { - return &handler{} -} - -func (h *handler) Convert(ctx context.Context, versionKind string, body []byte) (runtime.Object, error) { - log.Printf("Converting %s", versionKind) - if versionKind == "Channel.eventing.knative.dev/v1alpha1" { - channel := &eventingv1alpha1.Channel{} - if err := json.Unmarshal(body, channel); err != nil { - return nil, err - } - return channel, nil - } - if versionKind == "Pod.v1" { - pod := &corev1.Pod{} - if err := json.Unmarshal(body, pod); err != nil { - return nil, err - } - return pod, nil - } - return nil, fmt.Errorf("unknown verison kind: %s", versionKind) -} - -func (h *handler) Handle(ctx context.Context, source eventingv1alpha1.Source, c []runtime.Object) (*eventingv1alpha1.SourceStatus, []runtime.Object, error) { - - log.Printf("Starting Handle for: %s", source.Name) - - args := &HeartBeatArguments{} - - if source.Spec.Arguments != nil { - if err := json.Unmarshal(source.Spec.Arguments.Raw, args); err != nil { - log.Printf("Error: %s failed to unmarshal arguments, %v", source.Name, err) - } - } - args.Name = source.Name - - var orgChannel *eventingv1alpha1.Channel - var orgPod *corev1.Pod - //c[0].GetObjectKind().GroupVersionKind().Kind <-- todo could do it this way. - for _, obj := range c { - log.Printf("obj is a kind %s", reflect.TypeOf(obj)) - if orgObj, ok := obj.(*eventingv1alpha1.Channel); ok { - orgChannel = orgObj - } - if orgObj, ok := obj.(*corev1.Pod); ok { - orgPod = orgObj - } - } - - channel := makeChannel(orgChannel, args) - pod := makePod(orgPod, orgChannel, args) - - status := updateStatus(source.Status, orgChannel, channel, orgPod, pod) - - var children []runtime.Object - if channel != nil { - children = append(children, channel) - } - if pod != nil { - children = append(children, pod) - } - - return status, children, nil -} - -func updateStatus(org eventingv1alpha1.SourceStatus, orgChan, channel *eventingv1alpha1.Channel, orgPod, pod *corev1.Pod) *eventingv1alpha1.SourceStatus { - status := org.DeepCopy() - - if orgChan != nil && orgPod != nil { - c := orgChan.Status.GetCondition(eventingv1alpha1.ChannelConditionReady) - if c.IsTrue() && orgPod != nil { // TODO:: could look at pod status. - status.MarkProvisioned() - return status - } - } - - cM := "" - pM := "" - if orgChan == nil { - if channel == nil { - cM = "Channel nil" - } else { - cM = "Channel created" - } - } else { - c := orgChan.Status.GetCondition(eventingv1alpha1.ChannelConditionReady) - cM = fmt.Sprintf("Channel.isReady: %v", c.IsTrue()) - } - - if orgPod == nil { - if pod == nil { - pM = "Pod nil" - } else { - pM = "Pod created" - } - } else { - pM = fmt.Sprintf("Pod.isReady: %v", orgPod != nil) - } - status.MarkDeprovisioned("Provisioning", "%s; %s", cM, pM) - return status -} - -func makeChannel(org *eventingv1alpha1.Channel, args *HeartBeatArguments) *eventingv1alpha1.Channel { - channel := &eventingv1alpha1.Channel{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "eventing.knative.dev/v1alpha1", - Kind: "Channel", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: args.Name + "-chan", - }, - Spec: eventingv1alpha1.ChannelSpec{ - Provisioner: &eventingv1alpha1.ProvisionerReference{ - Ref: &corev1.ObjectReference{ - Name: "in-memory-bus-provisioner", - APIVersion: "eventing.knative.dev/v1alpha1", - Kind: "ClusterProvisioner", - }, - }, - Channelable: &duckv1alpha1.Channelable{ - Subscribers: []duckv1alpha1.ChannelSubscriberSpec{{ - SinkableDomain: "message-dumper.default.svc.cluster.local", - }}, - }, - }, - } - if org != nil { - channel.Spec.Generation = org.Spec.Generation - } - return channel -} - -func makePod(org *corev1.Pod, channel *eventingv1alpha1.Channel, args *HeartBeatArguments) *corev1.Pod { - - if channel == nil || channel.Status.Sinkable.DomainInternal == "" { - log.Printf("channel: %v", channel) - return nil - } else { - log.Printf("channel: %v", channel) - } - - remote := fmt.Sprintf("--remote=http://%s", channel.Status.Sinkable.DomainInternal) - period := "" - if args.Period > 0 { - period = fmt.Sprintf("--period=%d", args.Period) - } - label := "" - if args.Label != "" { - period = fmt.Sprintf("--label=%s", args.Label) - } - - pod := &corev1.Pod{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Pod", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: args.Name + "-er", - }, - Spec: corev1.PodSpec{ - RestartPolicy: corev1.RestartPolicyOnFailure, - Containers: []corev1.Container{ - { - Name: "heartbeat", - Image: image, - Args: []string{ - remote, period, label, - }, - }, - }, - }, - } - - return pod -} diff --git a/pkg/provisioners/meta/sdk/controller.go b/pkg/provisioners/meta/sdk/controller.go deleted file mode 100644 index 80c6f46c781..00000000000 --- a/pkg/provisioners/meta/sdk/controller.go +++ /dev/null @@ -1,120 +0,0 @@ -package sdk - -import ( - "context" - "encoding/json" - "flag" - "fmt" - "io/ioutil" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "log" - "net/http" - - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" -) - -var ( - port int - ctx context.Context - handler Handler -) - -type Handler interface { - Convert(context.Context, string, []byte) (runtime.Object, error) - Handle(context.Context, eventingv1alpha1.Source, []runtime.Object) (*eventingv1alpha1.SourceStatus, []runtime.Object, error) -} - -type SyncRequest struct { - Controller *runtime.RawExtension `json:"controller"` - Parent eventingv1alpha1.Source `json:"parent"` - Children map[string]ChildrenGroup `json:"children"` // TODO not sure how to deal with children - Finalizing bool `json:"finalizing"` -} - -type ChildrenGroup map[string]unstructured.Unstructured - -type SyncResponse struct { - Status eventingv1alpha1.SourceStatus `json:"status"` - Children []runtime.Object `json:"children"` -} - -func Handle(h Handler) { - handler = h -} - -func httpHandler(w http.ResponseWriter, r *http.Request) { - body, err := ioutil.ReadAll(r.Body) - - log.Printf("Sync: %s", string(body)) - - if err != nil { - log.Printf("Error: failed to read all body, %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - request := &SyncRequest{} - if err := json.Unmarshal(body, request); err != nil { - log.Printf("Error: failed to unmarshal request, %v", err) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - children := make([]runtime.Object, 0, len(request.Children)) // this is wrong, .children is the size of the map, not the objects. But it will scale up - - for k, cg := range request.Children { - for _, child := range cg { - cjson, err := child.MarshalJSON() - if err != nil { - log.Printf("Error: failed to marshal json from child %v, %v", child, err) - continue - } - newChild, err := handler.Convert(ctx, k, cjson) - if err != nil { - log.Printf("Error: failed to convert child json %q to runtime object, %v", string(cjson), err) - continue - } - log.Printf("converted a child: %v", newChild) - children = append(children, newChild) - } - } - - status, newChildren, err := handler.Handle(ctx, request.Parent, children) // TODO convert the children to a flat list. - if err != nil { - log.Printf("Error: failed to handle request, %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - body, err = json.Marshal(&SyncResponse{ - Status: *status, - Children: newChildren, - }) - if err != nil { - log.Printf("Error: failed to marshel response, %v", err) - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - w.Header().Set("Content-Type", "application/json") - w.Write(body) -} - -func Run(c context.Context) { - flag.Parse() - - ctx = c - http.HandleFunc("/", httpHandler) - - server := &http.Server{Addr: fmt.Sprintf(":%d", port)} - go func() { - log.Fatal(server.ListenAndServe()) - }() - - // Shutdown on SIGTERM. - sig := <-ctx.Done() - log.Printf("Received %v signal. Shutting down...", sig) - server.Shutdown(ctx) -} - -func init() { - flag.IntVar(&port, "port", 80, "The port to listen on") -} diff --git a/pkg/provisioners/sdk/provider.go b/pkg/provisioners/sdk/provider.go new file mode 100644 index 00000000000..934ebd8dc77 --- /dev/null +++ b/pkg/provisioners/sdk/provider.go @@ -0,0 +1,282 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sdk + +import ( + "context" + "encoding/json" + "fmt" + "github.com/knative/pkg/logging" + "github.com/mattbaird/jsonpatch" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "reflect" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +type KnativeReconciler interface { + Reconcile(ctx context.Context, object runtime.Object) (runtime.Object, error) + InjectClient(c client.Client) error + InjectConfig(c *rest.Config) error +} + +type reconciler struct { + client client.Client + restConfig *rest.Config + dynamicClient dynamic.Interface + recorder record.EventRecorder + + provider Provider +} + +// Verify the struct implements reconcile.Reconciler +var _ reconcile.Reconciler = &reconciler{} + +type Provider struct { + AgentName string + // Parent is a resource kind to reconcile with empty content. i.e. &v1.Parent{} + Parent runtime.Object + // Owns are dependent resources owned by the parent for which changes to + // those resources cause the Parent to be re-reconciled. This is a list of + // resources of kind with empty content. i.e. [&v1.Child{}] + Owns []runtime.Object + + Reconciler KnativeReconciler +} + +// ProvideController returns a controller for controller-runtime. +func (p *Provider) ProvideController(mgr manager.Manager) (controller.Controller, error) { + // Setup a new controller to Reconcile Subscriptions. + c, err := controller.New(p.AgentName, mgr, controller.Options{ + Reconciler: &reconciler{ + recorder: mgr.GetRecorder(p.AgentName), + }, + }) + if err != nil { + return nil, err + } + + // Watch Parent events and enqueue Parent object key. + if err := c.Watch(&source.Kind{Type: p.Parent}, &handler.EnqueueRequestForObject{}); err != nil { + return nil, err + } + + // Watch and enqueue for owning obj key. + for _, t := range p.Owns { + if err := c.Watch(&source.Kind{Type: t}, + &handler.EnqueueRequestForOwner{OwnerType: p.Parent, IsController: true}); err != nil { + return nil, err + } + } + + return c, nil +} + +// Reconcile compares the actual state with the desired, and attempts to +// converge the two. +func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { + ctx := context.TODO() + logger := logging.FromContext(ctx) + + logger.Infof("Reconciling %s %v", r.provider.Parent.GetObjectKind(), request) + + obj := r.provider.Parent.DeepCopyObject() + + err := r.client.Get(context.TODO(), request.NamespacedName, obj) + + if errors.IsNotFound(err) { + logger.Errorf("could not find %s %v\n", r.provider.Parent.GetObjectKind(), request) + return reconcile.Result{}, nil + } + + if err != nil { + logger.Errorf("could not fetch %s %v for %+v\n", r.provider.Parent.GetObjectKind(), err, request) + return reconcile.Result{}, err + } + + original := obj.DeepCopyObject() + + // Reconcile this copy of the Source and then write back any status + // updates regardless of whether the reconcile error out. + obj, err = r.provider.Reconciler.Reconcile(ctx, obj) + if err != nil { + logger.Warnf("Failed to reconcile %s: %v", r.provider.Parent.GetObjectKind(), err) + } + + if chg, err := hasChanged(ctx, original, obj); err != nil || !chg { + // If we didn't change anything then don't call updateStatus. + // This is important because the copy we loaded from the informer's + // cache may be stale and we don't want to overwrite a prior update + // to status with this stale state. + return reconcile.Result{}, err + } else if _, err := r.updateStatus(request, obj); err != nil { + logger.Warnf("Failed to update %s status: %v", r.provider.Parent.GetObjectKind(), err) + return reconcile.Result{}, err + } + + // Requeue if the resource is not ready: + return reconcile.Result{}, err +} + +func (r *reconciler) InjectClient(c client.Client) error { + r.client = c + r.provider.Reconciler.InjectClient(c) + return nil +} + +func (r *reconciler) InjectConfig(c *rest.Config) error { + r.restConfig = c + var err error + r.dynamicClient, err = dynamic.NewForConfig(c) + r.provider.Reconciler.InjectConfig(c) + return err +} + +func (r *reconciler) updateStatus(request reconcile.Request, object runtime.Object) (runtime.Object, error) { + freshObj := r.provider.Parent.DeepCopyObject() + if err := r.client.Get(context.TODO(), request.NamespacedName, freshObj); err != nil { + return nil, err + } + + fresh := NewReflectedStatusAccessor(freshObj) + org := NewReflectedStatusAccessor(object) + + fresh.SetStatus(org.GetStatus()) + + // Until #38113 is merged, we must use Update instead of UpdateStatus to + // update the Status block of the Source resource. UpdateStatus will not + // allow changes to the Spec of the resource, which is ideal for ensuring + // nothing other than resource status has been updated. + if err := r.client.Update(context.TODO(), freshObj); err != nil { + return nil, err + } + return freshObj, nil +} + +// Not worth fully duck typing since there's no shared schema. +type hasSpec struct { + Spec json.RawMessage `json:"spec"` +} + +func getSpecJSON(crd runtime.Object) ([]byte, error) { + b, err := json.Marshal(crd) + if err != nil { + return nil, err + } + hs := hasSpec{} + if err := json.Unmarshal(b, &hs); err != nil { + return nil, err + } + return []byte(hs.Spec), nil +} + +func hasChanged(ctx context.Context, old, new runtime.Object) (bool, error) { + if old == nil { + return true, nil + } + logger := logging.FromContext(ctx) + + oldSpecJSON, err := getSpecJSON(old) + if err != nil { + logger.Error("Failed to get Spec JSON for old", zap.Error(err)) + return false, err + } + newSpecJSON, err := getSpecJSON(new) + if err != nil { + logger.Error("Failed to get Spec JSON for new", zap.Error(err)) + return false, err + } + + specPatches, err := jsonpatch.CreatePatch(oldSpecJSON, newSpecJSON) + if err != nil { + fmt.Printf("Error creating JSON patch:%v", err) + return false, err + } + if len(specPatches) == 0 { + return false, nil + } + specPatchesJSON, err := json.Marshal(specPatches) + if err != nil { + logger.Error("Failed to marshal spec patches", zap.Error(err)) + return false, err + } + logger.Infof("Specs differ:\n%+v\n", string(specPatchesJSON)) + return true, nil +} + +// Conditions is the interface for a Resource that implements the getter and +// setter for accessing a Condition collection. +// +k8s:deepcopy-gen=true +type StatusAccessor interface { + GetStatus() interface{} + SetStatus(interface{}) +} + +// NewReflectedConditionsAccessor uses reflection to return a ConditionsAccessor +// to access the field called "Conditions". +func NewReflectedStatusAccessor(object interface{}) StatusAccessor { + objectValue := reflect.Indirect(reflect.ValueOf(object)) + + // If object is not a struct, don't even try to use it. + if objectValue.Kind() != reflect.Struct { + return nil + } + + statusField := objectValue.FieldByName("Status") + + if statusField.IsValid() && statusField.CanInterface() && statusField.CanSet() { + if _, ok := statusField.Interface().(interface{}); ok { + return &reflectedStatusAccessor{ + status: statusField, + } + } + } + return nil +} + +// reflectedConditionsAccessor is an internal wrapper object to act as the +// ConditionsAccessor for status objects that do not implement ConditionsAccessor +// directly, but do expose the field using the "Conditions" field name. +type reflectedStatusAccessor struct { + status reflect.Value +} + +// GetConditions uses reflection to return Conditions from the held status object. +func (r *reflectedStatusAccessor) GetStatus() interface{} { + if r != nil && r.status.IsValid() && r.status.CanInterface() { + if status, ok := r.status.Interface().(interface{}); ok { + return status + } + } + return nil +} + +// SetConditions uses reflection to set Conditions on the held status object. +func (r *reflectedStatusAccessor) SetStatus(status interface{}) { + if r != nil && r.status.IsValid() && r.status.CanSet() { + r.status.Set(reflect.ValueOf(status)) + } +} From 7c6908cf4fc28823e2a615c56fa5d7990796b068 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Mon, 15 Oct 2018 10:21:04 -0700 Subject: [PATCH 08/10] broke heartbeats into container and a example. --- cmd/controller/controller-runtime-main.go | 4 +- .../config/container.yaml} | 2 +- .../controller/provider.go | 6 +- .../controller/reconcile.go | 46 ++-- .../controller/resources/arguments.go | 10 +- .../controller/resources/channel.go | 4 +- .../controller/resources/deployment.go} | 67 +++--- .../heartbeats/config/examples/source.yaml | 8 +- pkg/provisioners/sdk/provider.go | 208 ------------------ pkg/provisioners/sdk/reconciler.go | 141 ++++++++++++ pkg/provisioners/sdk/status_accessor.go | 75 +++++++ 11 files changed, 293 insertions(+), 278 deletions(-) rename pkg/provisioners/{heartbeats/config/heartbeat.yaml => container/config/container.yaml} (88%) rename pkg/provisioners/{heartbeats => container}/controller/provider.go (91%) rename pkg/provisioners/{heartbeats => container}/controller/reconcile.go (78%) rename pkg/provisioners/{heartbeats => container}/controller/resources/arguments.go (74%) rename pkg/provisioners/{heartbeats => container}/controller/resources/channel.go (92%) rename pkg/provisioners/{heartbeats/controller/resources/pod.go => container/controller/resources/deployment.go} (53%) create mode 100644 pkg/provisioners/sdk/reconciler.go create mode 100644 pkg/provisioners/sdk/status_accessor.go diff --git a/cmd/controller/controller-runtime-main.go b/cmd/controller/controller-runtime-main.go index 1673ad68939..485b4b44899 100644 --- a/cmd/controller/controller-runtime-main.go +++ b/cmd/controller/controller-runtime-main.go @@ -23,7 +23,7 @@ import ( "github.com/knative/eventing/pkg/controller/eventing/subscription" "github.com/knative/eventing/pkg/controller/feed" "github.com/knative/eventing/pkg/controller/flow" - heartbeatcontroller "github.com/knative/eventing/pkg/provisioners/heartbeats/controller" + containercontroller "github.com/knative/eventing/pkg/provisioners/container/controller" "go.uber.org/zap" "strings" @@ -50,7 +50,7 @@ type ProvideFunc func(manager.Manager) (controller.Controller, error) // be added to the default providers list. var ExperimentalControllers = map[string]ProvideFunc{ "subscription.eventing.knative.dev": subscription.ProvideController, - "heartbeats-provisioner": heartbeatcontroller.ProvideController, + "container-provisioner": containercontroller.ProvideController, } // controllerRuntimeStart runs controllers written for controller-runtime. It's diff --git a/pkg/provisioners/heartbeats/config/heartbeat.yaml b/pkg/provisioners/container/config/container.yaml similarity index 88% rename from pkg/provisioners/heartbeats/config/heartbeat.yaml rename to pkg/provisioners/container/config/container.yaml index 5f9ca6f7fda..cef82c759bc 100644 --- a/pkg/provisioners/heartbeats/config/heartbeat.yaml +++ b/pkg/provisioners/container/config/container.yaml @@ -1,7 +1,7 @@ apiVersion: eventing.knative.dev/v1alpha1 kind: ClusterProvisioner metadata: - name: heartbeats + name: container spec: reconciles: group: eventing.knative.dev diff --git a/pkg/provisioners/heartbeats/controller/provider.go b/pkg/provisioners/container/controller/provider.go similarity index 91% rename from pkg/provisioners/heartbeats/controller/provider.go rename to pkg/provisioners/container/controller/provider.go index c572951c379..0d502943486 100644 --- a/pkg/provisioners/heartbeats/controller/provider.go +++ b/pkg/provisioners/container/controller/provider.go @@ -19,7 +19,7 @@ package controller import ( "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/provisioners/sdk" - corev1 "k8s.io/api/core/v1" + appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" @@ -32,7 +32,7 @@ import ( const ( // controllerAgentName is the string used by this controller to identify // itself when creating events. - controllerAgentName = "heartbeats-provisioner-controller" + controllerAgentName = "container-provisioner-controller" ) type reconciler struct { @@ -47,7 +47,7 @@ func ProvideController(mgr manager.Manager) (controller.Controller, error) { p := &sdk.Provider{ AgentName: controllerAgentName, Parent: &v1alpha1.Source{}, - Owns: []runtime.Object{&corev1.Pod{}, &v1alpha1.Channel{}}, + Owns: []runtime.Object{&appsv1.Deployment{}, &v1alpha1.Channel{}}, Reconciler: &reconciler{ recorder: mgr.GetRecorder(controllerAgentName), }, diff --git a/pkg/provisioners/heartbeats/controller/reconcile.go b/pkg/provisioners/container/controller/reconcile.go similarity index 78% rename from pkg/provisioners/heartbeats/controller/reconcile.go rename to pkg/provisioners/container/controller/reconcile.go index 131311f749b..bb0d95071cd 100644 --- a/pkg/provisioners/heartbeats/controller/reconcile.go +++ b/pkg/provisioners/container/controller/reconcile.go @@ -20,8 +20,9 @@ import ( "context" "encoding/json" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/provisioners/heartbeats/controller/resources" + "github.com/knative/eventing/pkg/provisioners/container_provisioner/controller/resources" "github.com/knative/pkg/logging" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -32,7 +33,7 @@ import ( ) const ( - provisionerName = "heartbeats" + provisionerName = "container" ) // Reconcile compares the actual state with the desired, and attempts to @@ -48,7 +49,7 @@ func (r *reconciler) Reconcile(ctx context.Context, object runtime.Object) (runt } if source.Spec.Provisioner.Ref.Name != provisionerName { - logger.Errorf("heartbeats skipping source %s, provisioned by %s\n", source.Name, source.Spec.Provisioner.Ref.Name) + logger.Errorf("skipping source %s, provisioned by %s\n", source.Name, source.Spec.Provisioner.Ref.Name) return source, nil } @@ -63,8 +64,7 @@ func (r *reconciler) Reconcile(ctx context.Context, object runtime.Object) (runt //} //deletionTimestamp := accessor.GetDeletionTimestamp() - args := &resources.HeartBeatArguments{} - + args := &resources.ContainerArguments{} if source.Spec.Arguments != nil { if err := json.Unmarshal(source.Spec.Arguments.Raw, args); err != nil { logger.Infof("Error: %s failed to unmarshal arguments, %v", source.Name, err) @@ -97,29 +97,27 @@ func (r *reconciler) Reconcile(ctx context.Context, object runtime.Object) (runt } - pod, err := r.getPod(ctx, source) + deploy, err := r.getDeployment(ctx, source) if err != nil { - fqn := "Pod.core/v1" + fqn := "Deployment.apps/v1" if errors.IsNotFound(err) { - pod, err = r.createPod(ctx, source, nil, channel, args) + deploy, err = r.createDeployment(ctx, source, nil, channel, args) if err != nil { r.recorder.Eventf(source, corev1.EventTypeNormal, "Blocked", "waiting for %v", err) source.Status.SetProvisionedObjectState(args.Name, fqn, "Blocked", "waiting for %v", args.Name, err) return object, err } - r.recorder.Eventf(source, corev1.EventTypeNormal, "Provisioned", "Created pod %q", pod.Name) - source.Status.SetProvisionedObjectState(pod.Name, fqn, "Created", "Created pod %q", pod.Name) + r.recorder.Eventf(source, corev1.EventTypeNormal, "Provisioned", "Created deployment %q", deploy.Name) + source.Status.SetProvisionedObjectState(deploy.Name, fqn, "Created", "Created deployment %q", deploy.Name) source.Status.MarkDeprovisioned("Provisioning", "Provisioning Pod %s", args.Name) } else { - if pod.Status.Phase == corev1.PodRunning { - source.Status.SetProvisionedObjectState(pod.Name, fqn, "Ready", "") + if deploy.Status.ReadyReplicas > 0 { + source.Status.SetProvisionedObjectState(deploy.Name, fqn, "Ready", "") source.Status.MarkProvisioned() } } } - // TODO: need informers for the object we are controlling - return source, nil } @@ -154,7 +152,7 @@ func (r *reconciler) getChannel(ctx context.Context, source *v1alpha1.Source) (* return nil, errors.NewNotFound(schema.GroupResource{}, "") } -func (r *reconciler) createChannel(ctx context.Context, source *v1alpha1.Source, org *v1alpha1.Channel, args *resources.HeartBeatArguments) (*v1alpha1.Channel, error) { +func (r *reconciler) createChannel(ctx context.Context, source *v1alpha1.Source, org *v1alpha1.Channel, args *resources.ContainerArguments) (*v1alpha1.Channel, error) { channel, err := resources.MakeChannel(source, org, args) if err != nil { return nil, err @@ -166,10 +164,10 @@ func (r *reconciler) createChannel(ctx context.Context, source *v1alpha1.Source, return channel, nil } -func (r *reconciler) getPod(ctx context.Context, source *v1alpha1.Source) (*corev1.Pod, error) { +func (r *reconciler) getDeployment(ctx context.Context, source *v1alpha1.Source) (*appsv1.Deployment, error) { logger := logging.FromContext(ctx) - list := &corev1.PodList{} + list := &appsv1.DeploymentList{} err := r.client.List( ctx, &client.ListOptions{ @@ -179,14 +177,14 @@ func (r *reconciler) getPod(ctx context.Context, source *v1alpha1.Source) (*core // needed. Raw: &metav1.ListOptions{ TypeMeta: metav1.TypeMeta{ - APIVersion: corev1.SchemeGroupVersion.String(), - Kind: "Pod", + APIVersion: appsv1.SchemeGroupVersion.String(), + Kind: "Deployment", }, }, }, list) if err != nil { - logger.Errorf("Unable to list pods: %v", err) + logger.Errorf("Unable to list deployments: %v", err) return nil, err } for _, c := range list.Items { @@ -197,14 +195,14 @@ func (r *reconciler) getPod(ctx context.Context, source *v1alpha1.Source) (*core return nil, errors.NewNotFound(schema.GroupResource{}, "") } -func (r *reconciler) createPod(ctx context.Context, source *v1alpha1.Source, org *corev1.Pod, channel *v1alpha1.Channel, args *resources.HeartBeatArguments) (*corev1.Pod, error) { - pod, err := resources.MakePod(source, org, channel, args) +func (r *reconciler) createDeployment(ctx context.Context, source *v1alpha1.Source, org *appsv1.Deployment, channel *v1alpha1.Channel, args *resources.ContainerArguments) (*appsv1.Deployment, error) { + deployment, err := resources.MakeDeployment(source, org, channel, args) if err != nil { return nil, err } - if err := r.client.Create(ctx, pod); err != nil { + if err := r.client.Create(ctx, deployment); err != nil { return nil, err } - return pod, nil + return deployment, nil } diff --git a/pkg/provisioners/heartbeats/controller/resources/arguments.go b/pkg/provisioners/container/controller/resources/arguments.go similarity index 74% rename from pkg/provisioners/heartbeats/controller/resources/arguments.go rename to pkg/provisioners/container/controller/resources/arguments.go index 6ab3b5f6db6..1cdd0b369bc 100644 --- a/pkg/provisioners/heartbeats/controller/resources/arguments.go +++ b/pkg/provisioners/container/controller/resources/arguments.go @@ -16,9 +16,9 @@ limitations under the License. package resources -type HeartBeatArguments struct { - Name string `json:"-"` - Namespace string `json:"-"` - Label string `json:"label"` - Period int `json:"period"` +type ContainerArguments struct { + Name string `json:"-"` + Namespace string `json:"-"` + Image string `json:"image"` + Args map[string]string `json:"args"` } diff --git a/pkg/provisioners/heartbeats/controller/resources/channel.go b/pkg/provisioners/container/controller/resources/channel.go similarity index 92% rename from pkg/provisioners/heartbeats/controller/resources/channel.go rename to pkg/provisioners/container/controller/resources/channel.go index b92d34420d0..0161abf6415 100644 --- a/pkg/provisioners/heartbeats/controller/resources/channel.go +++ b/pkg/provisioners/container/controller/resources/channel.go @@ -24,7 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func MakeChannel(source *v1alpha1.Source, org *v1alpha1.Channel, args *HeartBeatArguments) (*v1alpha1.Channel, error) { +func MakeChannel(source *v1alpha1.Source, org *v1alpha1.Channel, args *ContainerArguments) (*v1alpha1.Channel, error) { channel := &v1alpha1.Channel{ TypeMeta: metav1.TypeMeta{ APIVersion: "eventing.knative.dev/v1alpha1", @@ -40,7 +40,7 @@ func MakeChannel(source *v1alpha1.Source, org *v1alpha1.Channel, args *HeartBeat Spec: v1alpha1.ChannelSpec{ Provisioner: &v1alpha1.ProvisionerReference{ Ref: &corev1.ObjectReference{ - Name: "in-memory-channel", + Name: "in-memory-channel", // TODO: this should be changeable APIVersion: "eventing.knative.dev/v1alpha1", Kind: "ClusterProvisioner", }, diff --git a/pkg/provisioners/heartbeats/controller/resources/pod.go b/pkg/provisioners/container/controller/resources/deployment.go similarity index 53% rename from pkg/provisioners/heartbeats/controller/resources/pod.go rename to pkg/provisioners/container/controller/resources/deployment.go index ccbe108fcfa..f7673ffb4fb 100644 --- a/pkg/provisioners/heartbeats/controller/resources/pod.go +++ b/pkg/provisioners/container/controller/resources/deployment.go @@ -17,39 +17,35 @@ limitations under the License. package resources import ( + "fmt" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/controller" - "fmt" - + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -const ( - image = "gcr.io/plori-nicholss/cmd-f2e9e0e7e0a0f8a66b5b6f8d85f4c98b@sha256:d64b5fd7b4c8c5fdc70cca7732fcf384d3069a4a1762a69e58d701a040345aed" -) - -func MakePod(source *v1alpha1.Source, org *corev1.Pod, channel *v1alpha1.Channel, args *HeartBeatArguments) (*corev1.Pod, error) { +func MakeDeployment(source *v1alpha1.Source, org *appsv1.Deployment, channel *v1alpha1.Channel, args *ContainerArguments) (*appsv1.Deployment, error) { if channel == nil || channel.Status.Sinkable.DomainInternal == "" { return nil, fmt.Errorf("channel not ready") } - remote := fmt.Sprintf("--remote=http://%s", channel.Status.Sinkable.DomainInternal) - period := "" - if args.Period > 0 { - period = fmt.Sprintf("--period=%d", args.Period) - } - label := "" - if args.Label != "" { - period = fmt.Sprintf("--label=%s", args.Label) + containerArgs := []string(nil) + if args != nil { + containerArgs = make([]string, 0, len(args.Args)+1) + for k, v := range args.Args { + containerArgs = append(containerArgs, fmt.Sprintf("--%s=%q", k, v)) + } } + remote := fmt.Sprintf("--remote=http://%s", channel.Status.Sinkable.DomainInternal) + containerArgs = append(containerArgs, remote) - pod := &corev1.Pod{ + deploy := &appsv1.Deployment{ TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "Pod", + APIVersion: "apps/v1", + Kind: "Deployment", }, ObjectMeta: metav1.ObjectMeta{ GenerateName: args.Name + "-", @@ -60,22 +56,33 @@ func MakePod(source *v1alpha1.Source, org *corev1.Pod, channel *v1alpha1.Channel Annotations: map[string]string{ "sidecar.istio.io/inject": "true", }, - Labels: map[string]string{ - //"provisonereventing.knative.dev" - }, }, - Spec: corev1.PodSpec{ - RestartPolicy: corev1.RestartPolicyOnFailure, - Containers: []corev1.Container{ - { - Name: "heartbeat", - Image: image, - Args: []string{ - remote, period, label, + Spec: appsv1.DeploymentSpec{ + Replicas: func() *int32 { var i int32 = 1; return &i }(), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "source": args.Name, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "source": args.Name, + }, + }, + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyOnFailure, + Containers: []corev1.Container{ + { + Name: "source", + Image: args.Image, + Args: containerArgs, + ImagePullPolicy: corev1.PullAlways, + }, }, }, }, }, } - return pod, nil + return deploy, nil } diff --git a/pkg/provisioners/heartbeats/config/examples/source.yaml b/pkg/provisioners/heartbeats/config/examples/source.yaml index e5b29856fc9..c81c27bb56c 100644 --- a/pkg/provisioners/heartbeats/config/examples/source.yaml +++ b/pkg/provisioners/heartbeats/config/examples/source.yaml @@ -6,7 +6,9 @@ metadata: spec: provisioner: ref: - name: heartbeats + name: container arguments: - label: "<3" - period: 2 + image: github.com/knative/eventing/pkg/provisioners/heartbeats/cmd/ + args: + label: "<3" + period: 2 diff --git a/pkg/provisioners/sdk/provider.go b/pkg/provisioners/sdk/provider.go index 934ebd8dc77..40b36a6a44a 100644 --- a/pkg/provisioners/sdk/provider.go +++ b/pkg/provisioners/sdk/provider.go @@ -18,22 +18,12 @@ package sdk import ( "context" - "encoding/json" - "fmt" - "github.com/knative/pkg/logging" - "github.com/mattbaird/jsonpatch" - "go.uber.org/zap" - "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/record" - "reflect" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" ) @@ -43,18 +33,6 @@ type KnativeReconciler interface { InjectConfig(c *rest.Config) error } -type reconciler struct { - client client.Client - restConfig *rest.Config - dynamicClient dynamic.Interface - recorder record.EventRecorder - - provider Provider -} - -// Verify the struct implements reconcile.Reconciler -var _ reconcile.Reconciler = &reconciler{} - type Provider struct { AgentName string // Parent is a resource kind to reconcile with empty content. i.e. &v1.Parent{} @@ -94,189 +72,3 @@ func (p *Provider) ProvideController(mgr manager.Manager) (controller.Controller return c, nil } - -// Reconcile compares the actual state with the desired, and attempts to -// converge the two. -func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { - ctx := context.TODO() - logger := logging.FromContext(ctx) - - logger.Infof("Reconciling %s %v", r.provider.Parent.GetObjectKind(), request) - - obj := r.provider.Parent.DeepCopyObject() - - err := r.client.Get(context.TODO(), request.NamespacedName, obj) - - if errors.IsNotFound(err) { - logger.Errorf("could not find %s %v\n", r.provider.Parent.GetObjectKind(), request) - return reconcile.Result{}, nil - } - - if err != nil { - logger.Errorf("could not fetch %s %v for %+v\n", r.provider.Parent.GetObjectKind(), err, request) - return reconcile.Result{}, err - } - - original := obj.DeepCopyObject() - - // Reconcile this copy of the Source and then write back any status - // updates regardless of whether the reconcile error out. - obj, err = r.provider.Reconciler.Reconcile(ctx, obj) - if err != nil { - logger.Warnf("Failed to reconcile %s: %v", r.provider.Parent.GetObjectKind(), err) - } - - if chg, err := hasChanged(ctx, original, obj); err != nil || !chg { - // If we didn't change anything then don't call updateStatus. - // This is important because the copy we loaded from the informer's - // cache may be stale and we don't want to overwrite a prior update - // to status with this stale state. - return reconcile.Result{}, err - } else if _, err := r.updateStatus(request, obj); err != nil { - logger.Warnf("Failed to update %s status: %v", r.provider.Parent.GetObjectKind(), err) - return reconcile.Result{}, err - } - - // Requeue if the resource is not ready: - return reconcile.Result{}, err -} - -func (r *reconciler) InjectClient(c client.Client) error { - r.client = c - r.provider.Reconciler.InjectClient(c) - return nil -} - -func (r *reconciler) InjectConfig(c *rest.Config) error { - r.restConfig = c - var err error - r.dynamicClient, err = dynamic.NewForConfig(c) - r.provider.Reconciler.InjectConfig(c) - return err -} - -func (r *reconciler) updateStatus(request reconcile.Request, object runtime.Object) (runtime.Object, error) { - freshObj := r.provider.Parent.DeepCopyObject() - if err := r.client.Get(context.TODO(), request.NamespacedName, freshObj); err != nil { - return nil, err - } - - fresh := NewReflectedStatusAccessor(freshObj) - org := NewReflectedStatusAccessor(object) - - fresh.SetStatus(org.GetStatus()) - - // Until #38113 is merged, we must use Update instead of UpdateStatus to - // update the Status block of the Source resource. UpdateStatus will not - // allow changes to the Spec of the resource, which is ideal for ensuring - // nothing other than resource status has been updated. - if err := r.client.Update(context.TODO(), freshObj); err != nil { - return nil, err - } - return freshObj, nil -} - -// Not worth fully duck typing since there's no shared schema. -type hasSpec struct { - Spec json.RawMessage `json:"spec"` -} - -func getSpecJSON(crd runtime.Object) ([]byte, error) { - b, err := json.Marshal(crd) - if err != nil { - return nil, err - } - hs := hasSpec{} - if err := json.Unmarshal(b, &hs); err != nil { - return nil, err - } - return []byte(hs.Spec), nil -} - -func hasChanged(ctx context.Context, old, new runtime.Object) (bool, error) { - if old == nil { - return true, nil - } - logger := logging.FromContext(ctx) - - oldSpecJSON, err := getSpecJSON(old) - if err != nil { - logger.Error("Failed to get Spec JSON for old", zap.Error(err)) - return false, err - } - newSpecJSON, err := getSpecJSON(new) - if err != nil { - logger.Error("Failed to get Spec JSON for new", zap.Error(err)) - return false, err - } - - specPatches, err := jsonpatch.CreatePatch(oldSpecJSON, newSpecJSON) - if err != nil { - fmt.Printf("Error creating JSON patch:%v", err) - return false, err - } - if len(specPatches) == 0 { - return false, nil - } - specPatchesJSON, err := json.Marshal(specPatches) - if err != nil { - logger.Error("Failed to marshal spec patches", zap.Error(err)) - return false, err - } - logger.Infof("Specs differ:\n%+v\n", string(specPatchesJSON)) - return true, nil -} - -// Conditions is the interface for a Resource that implements the getter and -// setter for accessing a Condition collection. -// +k8s:deepcopy-gen=true -type StatusAccessor interface { - GetStatus() interface{} - SetStatus(interface{}) -} - -// NewReflectedConditionsAccessor uses reflection to return a ConditionsAccessor -// to access the field called "Conditions". -func NewReflectedStatusAccessor(object interface{}) StatusAccessor { - objectValue := reflect.Indirect(reflect.ValueOf(object)) - - // If object is not a struct, don't even try to use it. - if objectValue.Kind() != reflect.Struct { - return nil - } - - statusField := objectValue.FieldByName("Status") - - if statusField.IsValid() && statusField.CanInterface() && statusField.CanSet() { - if _, ok := statusField.Interface().(interface{}); ok { - return &reflectedStatusAccessor{ - status: statusField, - } - } - } - return nil -} - -// reflectedConditionsAccessor is an internal wrapper object to act as the -// ConditionsAccessor for status objects that do not implement ConditionsAccessor -// directly, but do expose the field using the "Conditions" field name. -type reflectedStatusAccessor struct { - status reflect.Value -} - -// GetConditions uses reflection to return Conditions from the held status object. -func (r *reflectedStatusAccessor) GetStatus() interface{} { - if r != nil && r.status.IsValid() && r.status.CanInterface() { - if status, ok := r.status.Interface().(interface{}); ok { - return status - } - } - return nil -} - -// SetConditions uses reflection to set Conditions on the held status object. -func (r *reflectedStatusAccessor) SetStatus(status interface{}) { - if r != nil && r.status.IsValid() && r.status.CanSet() { - r.status.Set(reflect.ValueOf(status)) - } -} diff --git a/pkg/provisioners/sdk/reconciler.go b/pkg/provisioners/sdk/reconciler.go new file mode 100644 index 00000000000..28372af8990 --- /dev/null +++ b/pkg/provisioners/sdk/reconciler.go @@ -0,0 +1,141 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sdk + +import ( + "context" + "github.com/knative/pkg/logging" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/reconcile" +) + +type reconciler struct { + client client.Client + restConfig *rest.Config + dynamicClient dynamic.Interface + recorder record.EventRecorder + + provider Provider +} + +// Verify the struct implements reconcile.Reconciler +var _ reconcile.Reconciler = &reconciler{} + +// Reconcile compares the actual state with the desired, and attempts to +// converge the two. +func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { + ctx := context.TODO() + logger := logging.FromContext(ctx) + + logger.Infof("Reconciling %s %v", r.provider.Parent.GetObjectKind(), request) + + obj := r.provider.Parent.DeepCopyObject() + + err := r.client.Get(context.TODO(), request.NamespacedName, obj) + + if errors.IsNotFound(err) { + logger.Errorf("could not find %s %v\n", r.provider.Parent.GetObjectKind(), request) + return reconcile.Result{}, nil + } + + if err != nil { + logger.Errorf("could not fetch %s %v for %+v\n", r.provider.Parent.GetObjectKind(), err, request) + return reconcile.Result{}, err + } + + original := obj.DeepCopyObject() + + // Reconcile this copy of the Source and then write back any status + // updates regardless of whether the reconcile error out. + obj, err = r.provider.Reconciler.Reconcile(ctx, obj) + if err != nil { + logger.Warnf("Failed to reconcile %s: %v", r.provider.Parent.GetObjectKind(), err) + } + + if chg, err := r.statusHasChanged(ctx, original, obj); err != nil || !chg { + // If we didn't change anything then don't call updateStatus. + // This is important because the copy we loaded from the informer's + // cache may be stale and we don't want to overwrite a prior update + // to status with this stale state. + return reconcile.Result{}, err + } else if _, err := r.updateStatus(ctx, request, obj); err != nil { + logger.Warnf("Failed to update %s status: %v", r.provider.Parent.GetObjectKind(), err) + return reconcile.Result{}, err + } + + // Requeue if the resource is not ready: + return reconcile.Result{}, err +} + +func (r *reconciler) InjectClient(c client.Client) error { + r.client = c + r.provider.Reconciler.InjectClient(c) + return nil +} + +func (r *reconciler) InjectConfig(c *rest.Config) error { + r.restConfig = c + var err error + r.dynamicClient, err = dynamic.NewForConfig(c) + r.provider.Reconciler.InjectConfig(c) + return err +} + +func (r *reconciler) statusHasChanged(ctx context.Context, old, new runtime.Object) (bool, error) { + if old == nil { + return true, nil + } + + o := NewReflectedStatusAccessor(old) + n := NewReflectedStatusAccessor(new) + + oStatus := o.GetStatus() + nStatus := n.GetStatus() + + if equality.Semantic.DeepEqual(oStatus, nStatus) { + return false, nil + } + + return true, nil +} + +func (r *reconciler) updateStatus(ctx context.Context, request reconcile.Request, object runtime.Object) (runtime.Object, error) { + freshObj := r.provider.Parent.DeepCopyObject() + if err := r.client.Get(ctx, request.NamespacedName, freshObj); err != nil { + return nil, err + } + + fresh := NewReflectedStatusAccessor(freshObj) + org := NewReflectedStatusAccessor(object) + + fresh.SetStatus(org.GetStatus()) + + // Until #38113 is merged, we must use Update instead of UpdateStatus to + // update the Status block of the Source resource. UpdateStatus will not + // allow changes to the Spec of the resource, which is ideal for ensuring + // nothing other than resource status has been updated. + if err := r.client.Update(ctx, freshObj); err != nil { + return nil, err + } + return freshObj, nil +} diff --git a/pkg/provisioners/sdk/status_accessor.go b/pkg/provisioners/sdk/status_accessor.go new file mode 100644 index 00000000000..ce5712b268f --- /dev/null +++ b/pkg/provisioners/sdk/status_accessor.go @@ -0,0 +1,75 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sdk + +import ( + "reflect" +) + +// Conditions is the interface for a Resource that implements the getter and +// setter for accessing a Condition collection. +// +k8s:deepcopy-gen=true +type StatusAccessor interface { + GetStatus() interface{} + SetStatus(interface{}) +} + +// NewReflectedConditionsAccessor uses reflection to return a ConditionsAccessor +// to access the field called "Conditions". +func NewReflectedStatusAccessor(object interface{}) StatusAccessor { + objectValue := reflect.Indirect(reflect.ValueOf(object)) + + // If object is not a struct, don't even try to use it. + if objectValue.Kind() != reflect.Struct { + return nil + } + + statusField := objectValue.FieldByName("Status") + + if statusField.IsValid() && statusField.CanInterface() && statusField.CanSet() { + if _, ok := statusField.Interface().(interface{}); ok { + return &reflectedStatusAccessor{ + status: statusField, + } + } + } + return nil +} + +// reflectedConditionsAccessor is an internal wrapper object to act as the +// ConditionsAccessor for status objects that do not implement ConditionsAccessor +// directly, but do expose the field using the "Conditions" field name. +type reflectedStatusAccessor struct { + status reflect.Value +} + +// GetConditions uses reflection to return Conditions from the held status object. +func (r *reflectedStatusAccessor) GetStatus() interface{} { + if r != nil && r.status.IsValid() && r.status.CanInterface() { + if status, ok := r.status.Interface().(interface{}); ok { + return status + } + } + return nil +} + +// SetConditions uses reflection to set Conditions on the held status object. +func (r *reflectedStatusAccessor) SetStatus(status interface{}) { + if r != nil && r.status.IsValid() && r.status.CanSet() { + r.status.Set(reflect.ValueOf(status)) + } +} From d24e123917126d5f035eba14c3922da6f7337c67 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Mon, 15 Oct 2018 15:13:45 -0700 Subject: [PATCH 09/10] workes with some mondo changes to channels. --- config/500-controller.yaml | 2 +- .../container/controller/reconcile.go | 26 +++++++++++-------- .../controller/resources/deployment.go | 4 --- pkg/provisioners/sdk/provider.go | 1 + pkg/provisioners/sdk/reconciler.go | 8 ++++-- .../heartbeats/cmd/main.go | 21 ++++++++++----- .../heartbeats/config}/source.yaml | 6 ++--- .../heartbeats/config}/subscription.yaml | 4 +-- 8 files changed, 43 insertions(+), 29 deletions(-) rename pkg/{provisioners => sources}/heartbeats/cmd/main.go (71%) rename pkg/{provisioners/heartbeats/config/examples => sources/heartbeats/config}/source.yaml (63%) rename pkg/{provisioners/heartbeats/config/examples => sources/heartbeats/config}/subscription.yaml (87%) diff --git a/config/500-controller.yaml b/config/500-controller.yaml index 83d22d65e3b..fd5efda3a76 100644 --- a/config/500-controller.yaml +++ b/config/500-controller.yaml @@ -31,7 +31,7 @@ spec: args: [ "-logtostderr", "-stderrthreshold", "INFO", - "--experimentalControllers=subscription.eventing.knative.dev,heartbeats-provisioner" # comma separated list. + "--experimentalControllers=subscription.eventing.knative.dev,container-provisioner" # comma separated list. ] volumeMounts: - name: config-logging diff --git a/pkg/provisioners/container/controller/reconcile.go b/pkg/provisioners/container/controller/reconcile.go index bb0d95071cd..288c6bb5f22 100644 --- a/pkg/provisioners/container/controller/reconcile.go +++ b/pkg/provisioners/container/controller/reconcile.go @@ -20,11 +20,12 @@ import ( "context" "encoding/json" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/provisioners/container_provisioner/controller/resources" + "github.com/knative/eventing/pkg/provisioners/container/controller/resources" "github.com/knative/pkg/logging" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -53,16 +54,19 @@ func (r *reconciler) Reconcile(ctx context.Context, object runtime.Object) (runt return source, nil } - source.Status.InitializeConditions() + // See if the source has been deleted + accessor, err := meta.Accessor(source) + if err != nil { + logger.Warnf("Failed to get metadata accessor: %s", err) + return object, err + } + // No need to reconcile if the source has been marked for deletion. + deletionTimestamp := accessor.GetDeletionTimestamp() + if deletionTimestamp != nil { + return object, err + } - // TODO: - //// See if the source has been deleted - //accessor, err := meta.Accessor(source) - //if err != nil { - // logger.Warnf("Failed to get metadata accessor: %s", err) - // return err - //} - //deletionTimestamp := accessor.GetDeletionTimestamp() + source.Status.InitializeConditions() args := &resources.ContainerArguments{} if source.Spec.Arguments != nil { @@ -109,7 +113,7 @@ func (r *reconciler) Reconcile(ctx context.Context, object runtime.Object) (runt } r.recorder.Eventf(source, corev1.EventTypeNormal, "Provisioned", "Created deployment %q", deploy.Name) source.Status.SetProvisionedObjectState(deploy.Name, fqn, "Created", "Created deployment %q", deploy.Name) - source.Status.MarkDeprovisioned("Provisioning", "Provisioning Pod %s", args.Name) + source.Status.MarkDeprovisioned("Provisioning", "Provisioning deployment %s", args.Name) } else { if deploy.Status.ReadyReplicas > 0 { source.Status.SetProvisionedObjectState(deploy.Name, fqn, "Ready", "") diff --git a/pkg/provisioners/container/controller/resources/deployment.go b/pkg/provisioners/container/controller/resources/deployment.go index f7673ffb4fb..5886f5758d3 100644 --- a/pkg/provisioners/container/controller/resources/deployment.go +++ b/pkg/provisioners/container/controller/resources/deployment.go @@ -53,9 +53,6 @@ func MakeDeployment(source *v1alpha1.Source, org *appsv1.Deployment, channel *v1 OwnerReferences: []metav1.OwnerReference{ *controller.NewControllerRef(source, false), }, - Annotations: map[string]string{ - "sidecar.istio.io/inject": "true", - }, }, Spec: appsv1.DeploymentSpec{ Replicas: func() *int32 { var i int32 = 1; return &i }(), @@ -71,7 +68,6 @@ func MakeDeployment(source *v1alpha1.Source, org *appsv1.Deployment, channel *v1 }, }, Spec: corev1.PodSpec{ - RestartPolicy: corev1.RestartPolicyOnFailure, Containers: []corev1.Container{ { Name: "source", diff --git a/pkg/provisioners/sdk/provider.go b/pkg/provisioners/sdk/provider.go index 40b36a6a44a..4dd714fdc72 100644 --- a/pkg/provisioners/sdk/provider.go +++ b/pkg/provisioners/sdk/provider.go @@ -50,6 +50,7 @@ func (p *Provider) ProvideController(mgr manager.Manager) (controller.Controller // Setup a new controller to Reconcile Subscriptions. c, err := controller.New(p.AgentName, mgr, controller.Options{ Reconciler: &reconciler{ + provider: *p, recorder: mgr.GetRecorder(p.AgentName), }, }) diff --git a/pkg/provisioners/sdk/reconciler.go b/pkg/provisioners/sdk/reconciler.go index 28372af8990..3c63574812c 100644 --- a/pkg/provisioners/sdk/reconciler.go +++ b/pkg/provisioners/sdk/reconciler.go @@ -89,7 +89,9 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err func (r *reconciler) InjectClient(c client.Client) error { r.client = c - r.provider.Reconciler.InjectClient(c) + if r.provider.Reconciler != nil { + r.provider.Reconciler.InjectClient(c) + } return nil } @@ -97,7 +99,9 @@ func (r *reconciler) InjectConfig(c *rest.Config) error { r.restConfig = c var err error r.dynamicClient, err = dynamic.NewForConfig(c) - r.provider.Reconciler.InjectConfig(c) + if r.provider.Reconciler != nil { + r.provider.Reconciler.InjectConfig(c) + } return err } diff --git a/pkg/provisioners/heartbeats/cmd/main.go b/pkg/sources/heartbeats/cmd/main.go similarity index 71% rename from pkg/provisioners/heartbeats/cmd/main.go rename to pkg/sources/heartbeats/cmd/main.go index e506c604c9f..99a94f02568 100644 --- a/pkg/provisioners/heartbeats/cmd/main.go +++ b/pkg/sources/heartbeats/cmd/main.go @@ -7,6 +7,7 @@ import ( "io" "log" "net/http" + "strconv" "strings" "time" ) @@ -17,17 +18,18 @@ type Heartbeat struct { } var ( - remote string - label string - period int - sequence int - hb *Heartbeat + remote string + label string + periodStr string + period int + sequence int + hb *Heartbeat ) func init() { flag.StringVar(&remote, "remote", "", "the host url to heartbeat to") flag.StringVar(&label, "label", "", "a special label") - flag.IntVar(&period, "period", 5, "the number of seconds between heartbeats") + flag.StringVar(&periodStr, "period", "5", "the number of seconds between heartbeats") } func main() { @@ -38,6 +40,11 @@ func main() { Label: label, } + period, err := strconv.Atoi(periodStr) + if err != nil { + period = 5 + } + for { send() time.Sleep(time.Duration(period) * time.Second) @@ -50,6 +57,8 @@ func send() { if err != nil { log.Printf("Unable to make request: %+v, %v", hb, err) return + } else { + log.Printf("[%d]: %+v", resp.StatusCode, hb) } defer resp.Body.Close() } diff --git a/pkg/provisioners/heartbeats/config/examples/source.yaml b/pkg/sources/heartbeats/config/source.yaml similarity index 63% rename from pkg/provisioners/heartbeats/config/examples/source.yaml rename to pkg/sources/heartbeats/config/source.yaml index c81c27bb56c..32aea6d35d8 100644 --- a/pkg/provisioners/heartbeats/config/examples/source.yaml +++ b/pkg/sources/heartbeats/config/source.yaml @@ -1,14 +1,14 @@ apiVersion: eventing.knative.dev/v1alpha1 kind: Source metadata: - name: love + name: heartbeats namespace: default spec: provisioner: ref: name: container arguments: - image: github.com/knative/eventing/pkg/provisioners/heartbeats/cmd/ + image: github.com/knative/eventing/pkg/sources/heartbeats/cmd/ args: label: "<3" - period: 2 + period: "2" diff --git a/pkg/provisioners/heartbeats/config/examples/subscription.yaml b/pkg/sources/heartbeats/config/subscription.yaml similarity index 87% rename from pkg/provisioners/heartbeats/config/examples/subscription.yaml rename to pkg/sources/heartbeats/config/subscription.yaml index d879d083e5a..d1b361bcfeb 100644 --- a/pkg/provisioners/heartbeats/config/examples/subscription.yaml +++ b/pkg/sources/heartbeats/config/subscription.yaml @@ -1,13 +1,13 @@ apiVersion: eventing.knative.dev/v1alpha1 kind: Subscription metadata: - name: love-dumper + name: heartbeats namespace: default spec: from: kind: Source apiVersion: eventing.knative.dev/v1alpha1 - name: love + name: heartbeats call: target: kind: Service From 109b59c217dddd6a6e9afc2e30edd829e95e1d44 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Mon, 15 Oct 2018 15:25:29 -0700 Subject: [PATCH 10/10] Update codegen. --- Gopkg.lock | 1 + .../container/controller/resources/channel.go | 3 ++ third_party/VENDOR-LICENSE | 33 +++++++++++++++++++ 3 files changed, 37 insertions(+) diff --git a/Gopkg.lock b/Gopkg.lock index fe699746ecf..b3d6fa0b8fe 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1091,6 +1091,7 @@ "go.uber.org/zap/zapcore", "golang.org/x/net/context", "golang.org/x/oauth2", + "golang.org/x/sync/errgroup", "google.golang.org/grpc/codes", "google.golang.org/grpc/status", "gopkg.in/go-playground/webhooks.v3", diff --git a/pkg/provisioners/container/controller/resources/channel.go b/pkg/provisioners/container/controller/resources/channel.go index 0161abf6415..93aa6d3da44 100644 --- a/pkg/provisioners/container/controller/resources/channel.go +++ b/pkg/provisioners/container/controller/resources/channel.go @@ -48,6 +48,9 @@ func MakeChannel(source *v1alpha1.Source, org *v1alpha1.Channel, args *Container }, } if org != nil { + // We need to keep the generation from the original channel until we use the correct + // generation generation. The knative/pkg webhook will reject the update if we do not + // do this. channel.Spec.Generation = org.Spec.Generation } return channel, nil diff --git a/third_party/VENDOR-LICENSE b/third_party/VENDOR-LICENSE index 45ce934d7ba..e1969f98375 100644 --- a/third_party/VENDOR-LICENSE +++ b/third_party/VENDOR-LICENSE @@ -4493,6 +4493,39 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +=========================================================== +Import: github.com/knative/eventing/vendor/golang.org/x/sync + +Copyright (c) 2009 The Go Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are +met: + + * Redistributions of source code must retain the above copyright +notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above +copyright notice, this list of conditions and the following disclaimer +in the documentation and/or other materials provided with the +distribution. + * Neither the name of Google Inc. nor the names of its +contributors may be used to endorse or promote products derived from +this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + + =========================================================== Import: github.com/knative/eventing/vendor/golang.org/x/sys