Skip to content
Closed
2 changes: 2 additions & 0 deletions cmd/controller/controller-runtime-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/knative/eventing/pkg/controller/eventing/subscription"
"github.com/knative/eventing/pkg/controller/feed"
"github.com/knative/eventing/pkg/controller/flow"
containercontroller "github.com/knative/eventing/pkg/provisioners/container/controller"
"go.uber.org/zap"
"strings"

Expand All @@ -49,6 +50,7 @@ type ProvideFunc func(manager.Manager) (controller.Controller, error)
// be added to the default providers list.
var ExperimentalControllers = map[string]ProvideFunc{
"subscription.eventing.knative.dev": subscription.ProvideController,
"container-provisioner": containercontroller.ProvideController,
}

// controllerRuntimeStart runs controllers written for controller-runtime. It's
Expand Down
2 changes: 1 addition & 1 deletion config/500-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ spec:
args: [
"-logtostderr",
"-stderrthreshold", "INFO",
"--experimentalControllers=subscription.eventing.knative.dev" # comma separated list.
"--experimentalControllers=subscription.eventing.knative.dev,container-provisioner" # comma separated list.
]
volumeMounts:
- name: config-logging
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/eventing/v1alpha1/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&ChannelList{},
&ClusterProvisioner{},
&ClusterProvisionerList{},
&Source{},
&SourceList{},
&Subscription{},
&SubscriptionList{},
)
Expand Down
38 changes: 38 additions & 0 deletions pkg/apis/eventing/v1alpha1/source_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package v1alpha1

import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/knative/pkg/apis/duck"
Expand Down Expand Up @@ -111,6 +112,12 @@ type SourceStatus struct {
// +patchStrategy=merge
Conditions duckv1alpha1.Conditions `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`

// Provisioned holds the status of a Provisioned Object at a point in time.
// +optional
// +patchMergeKey=name
// +patchStrategy=merge
Provisioned []ProvisionedObjectStatus `json:"provisioned,omitempty" patchStrategy:"merge" patchMergeKey:"name"`

// ObservedGeneration is the 'Generation' of the Source that
// was last reconciled by the controller.
// +optional
Expand All @@ -120,6 +127,17 @@ type SourceStatus struct {
Subscribable duckv1alpha1.Subscribable `json:"subscribable,omitempty"`
}

type ProvisionedObjectStatus struct {
// Name of Object
Name string `json:"name,omitempty"`
// Type is the fully qualified object type.
Type string `json:"type,omitempty"`
// Status is the current relationship between Source and Object.
Status string `json:"status,omitempty"`
// Reason is the detailed description describing current relationship status.
Reason string `json:"reason,omitempty"`
}

// GetCondition returns the condition currently associated with the given type, or nil.
func (ss *SourceStatus) GetCondition(t duckv1alpha1.ConditionType) *duckv1alpha1.Condition {
return sourceCondSet.Manage(ss).GetCondition(t)
Expand All @@ -145,6 +163,26 @@ func (ss *SourceStatus) MarkDeprovisioned(reason, messageFormat string, messageA
sourceCondSet.Manage(ss).MarkFalse(SourceConditionProvisioned, reason, messageFormat, messageA...)
}

// MarkProvisioned sets the condition that the source has had its backing resources created.
func (ss *SourceStatus) SetProvisionedObjectState(name, objType, status, reasonFormat string, reasonA ...interface{}) {
reason := fmt.Sprintf(reasonFormat, reasonA...)
newP := ProvisionedObjectStatus{
Name: name,
Type: objType,
Status: status,
Reason: reason,
}
newProvisioned := make([]ProvisionedObjectStatus, 0, len(ss.Provisioned))
for _, p := range ss.Provisioned {
if p.Name == newP.Name {
newProvisioned = append(newProvisioned, newP)
} else {
newProvisioned = append(newProvisioned, p)
}
}
ss.Provisioned = newProvisioned
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// SourceList is a list of Source resources
Expand Down
21 changes: 21 additions & 0 deletions pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 5 additions & 40 deletions pkg/controller/eventing/subscription/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,9 @@ package subscription

import (
"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/knative/eventing/pkg/controller/sdk"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

const (
Expand All @@ -35,44 +29,15 @@ const (
controllerAgentName = "subscription-controller"
)

type reconciler struct {
client client.Client
restConfig *rest.Config
dynamicClient dynamic.Interface
recorder record.EventRecorder
}

// Verify the struct implements reconcile.Reconciler
var _ reconcile.Reconciler = &reconciler{}

// ProvideController returns a Subscription controller.
func ProvideController(mgr manager.Manager) (controller.Controller, error) {
// Setup a new controller to Reconcile Subscriptions.
c, err := controller.New(controllerAgentName, mgr, controller.Options{
p := &sdk.Provider{
AgentName: controllerAgentName,
Parent: &v1alpha1.Subscription{},
Reconciler: &reconciler{
recorder: mgr.GetRecorder(controllerAgentName),
},
})
if err != nil {
return nil, err
}

// Watch Subscription events and enqueue Subscription object key.
if err := c.Watch(&source.Kind{Type: &v1alpha1.Subscription{}}, &handler.EnqueueRequestForObject{}); err != nil {
return nil, err
}

return c, nil
}

func (r *reconciler) InjectClient(c client.Client) error {
r.client = c
return nil
}

func (r *reconciler) InjectConfig(c *rest.Config) error {
r.restConfig = c
var err error
r.dynamicClient, err = dynamic.NewForConfig(c)
return err
return p.ProvideController(mgr)
}
82 changes: 30 additions & 52 deletions pkg/controller/eventing/subscription/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,52 +25,37 @@ import (
duckapis "github.com/knative/pkg/apis"
"github.com/knative/pkg/apis/duck"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
"github.com/knative/pkg/logging"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type reconciler struct {
client client.Client
restConfig *rest.Config
dynamicClient dynamic.Interface
recorder record.EventRecorder
}

// Reconcile compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Subscription resource
// with the current status of the resource.
func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) {
glog.Infof("Reconciling subscription %v", request)
subscription := &v1alpha1.Subscription{}
err := r.client.Get(context.TODO(), request.NamespacedName, subscription)

if errors.IsNotFound(err) {
glog.Errorf("could not find subscription %v\n", request)
return reconcile.Result{}, nil
}

if err != nil {
glog.Errorf("could not fetch Subscription %v for %+v\n", err, request)
return reconcile.Result{}, err
}
func (r *reconciler) Reconcile(ctx context.Context, object runtime.Object) (runtime.Object, error) {
logger := logging.FromContext(ctx)

original := subscription.DeepCopy()

// Reconcile this copy of the Subscription and then write back any status
// updates regardless of whether the reconcile error out.
err = r.reconcile(subscription)
if equality.Semantic.DeepEqual(original.Status, subscription.Status) {
// If we didn't change anything then don't call updateStatus.
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if _, err := r.updateStatus(subscription); err != nil {
glog.Warningf("Failed to update subscription status: %v", err)
return reconcile.Result{}, err
subscription, ok := object.(*v1alpha1.Subscription)
if !ok {
logger.Errorf("could not find subscription %v\n", object)
return object, nil
}

// Requeue if the resource is not ready:
return reconcile.Result{}, err
return subscription, r.reconcile(subscription)
}

func (r *reconciler) reconcile(subscription *v1alpha1.Subscription) error {
Expand Down Expand Up @@ -140,25 +125,6 @@ func (r *reconciler) reconcile(subscription *v1alpha1.Subscription) error {
return nil
}

func (r *reconciler) updateStatus(subscription *v1alpha1.Subscription) (*v1alpha1.Subscription, error) {
newSubscription := &v1alpha1.Subscription{}
err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: subscription.Namespace, Name: subscription.Name}, newSubscription)

if err != nil {
return nil, err
}
newSubscription.Status = subscription.Status

// Until #38113 is merged, we must use Update instead of UpdateStatus to
// update the Status block of the Subscription resource. UpdateStatus will not
// allow changes to the Spec of the resource, which is ideal for ensuring
// nothing other than resource status has been updated.
if err = r.client.Update(context.TODO(), newSubscription); err != nil {
return nil, err
}
return newSubscription, nil
}

// resolveCall resolves the Spec.Call object. If it's an ObjectReference will resolve the object
// and treat it as a Targetable.If it's TargetURI then it's used as is.
// TODO: Once Service Routes, etc. support Targetable, use that.
Expand Down Expand Up @@ -303,3 +269,15 @@ func (r *reconciler) CreateResourceInterface(namespace string, ref *corev1.Objec
return rc.Namespace(namespace), nil

}

func (r *reconciler) InjectClient(c client.Client) error {
r.client = c
return nil
}

func (r *reconciler) InjectConfig(c *rest.Config) error {
r.restConfig = c
var err error
r.dynamicClient, err = dynamic.NewForConfig(c)
return err
}
Loading