Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions pkg/apis/sources/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ const (
ApiServerSourceDeleteRefEventType = "dev.knative.apiserver.ref.delete"
)

// ApiServerSourceEventTypes is the list of CloudEvent types the ApiServerSource emits.
var ApiServerSourceEventTypes = []string{
ApiServerSourceAddEventType,
ApiServerSourceDeleteEventType,
ApiServerSourceUpdateEventType,
// ApiServerSourceEventReferenceModeTypes is the list of CloudEvent types the ApiServerSource with EventMode of ReferenceMode emits.
var ApiServerSourceEventReferenceModeTypes = []string{
ApiServerSourceAddRefEventType,
ApiServerSourceDeleteRefEventType,
ApiServerSourceUpdateRefEventType,
}

// ApiServerSourceEventResourceModeTypes is the list of CloudEvent types the ApiServerSource with EventMode of ResourceMode emits.
var ApiServerSourceEventResourceModeTypes = []string{
ApiServerSourceAddEventType,
ApiServerSourceDeleteEventType,
ApiServerSourceUpdateEventType,
}
24 changes: 18 additions & 6 deletions pkg/reconciler/apiserversource/apiserversource.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ const (
// Name of the corev1.Events emitted from the reconciliation process
apiserversourceDeploymentCreated = "ApiServerSourceDeploymentCreated"
apiserversourceDeploymentUpdated = "ApiServerSourceDeploymentUpdated"
apiserversourceDeploymentDeleted = "ApiServerSourceDeploymentDeleted"

component = "apiserversource"
)
Expand Down Expand Up @@ -112,7 +111,12 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1.ApiServerSour
}
source.Status.PropagateDeploymentAvailability(ra)

source.Status.CloudEventAttributes = r.createCloudEventAttributes()
cloudEventAttributes, err := r.createCloudEventAttributes(source)
Comment thread
capri-xiyue marked this conversation as resolved.
if err != nil {
logging.FromContext(ctx).Errorw("Unable to create CloudEventAttributes", zap.Error(err))
return err
}
source.Status.CloudEventAttributes = cloudEventAttributes

return nil
}
Expand Down Expand Up @@ -242,13 +246,21 @@ func (r *Reconciler) runAccessCheck(ctx context.Context, src *v1.ApiServerSource

}

func (r *Reconciler) createCloudEventAttributes() []duckv1.CloudEventAttributes {
ceAttributes := make([]duckv1.CloudEventAttributes, 0, len(apisources.ApiServerSourceEventTypes))
for _, apiServerSourceType := range apisources.ApiServerSourceEventTypes {
func (r *Reconciler) createCloudEventAttributes(src *v1.ApiServerSource) ([]duckv1.CloudEventAttributes, error) {
var eventTypes []string
if src.Spec.EventMode == v1.ReferenceMode {
eventTypes = apisources.ApiServerSourceEventReferenceModeTypes
} else if src.Spec.EventMode == v1.ResourceMode {
eventTypes = apisources.ApiServerSourceEventResourceModeTypes
} else {
return []duckv1.CloudEventAttributes{}, fmt.Errorf("no EventType available for EventMode: %s", src.Spec.EventMode)
}
ceAttributes := make([]duckv1.CloudEventAttributes, 0, len(eventTypes))
for _, apiServerSourceType := range eventTypes {
ceAttributes = append(ceAttributes, duckv1.CloudEventAttributes{
Type: apiServerSourceType,
Source: r.ceSource,
})
}
return ceAttributes
return ceAttributes, nil
}
98 changes: 91 additions & 7 deletions pkg/reconciler/apiserversource/apiserversource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,57 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithApiServerSourceDeployed,
rttestingv1.WithApiServerSourceSink(sinkURI),
rttestingv1.WithApiServerSourceSufficientPermissions,
rttestingv1.WithApiServerSourceEventTypes(source),
rttestingv1.WithApiServerSourceReferenceModeEventTypes(source),
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
),
}},
WantCreates: []runtime.Object{
makeSubjectAccessReview("namespaces", "get", "default"),
makeSubjectAccessReview("namespaces", "list", "default"),
makeSubjectAccessReview("namespaces", "watch", "default"),
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
}, {
Name: "valid with eventmode of resourcemode",
Objects: []runtime.Object{
rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Resources: []sourcesv1.APIVersionKindSelector{{
APIVersion: "v1",
Kind: "Namespace",
}},
EventMode: sourcesv1.ResourceMode,
SourceSpec: duckv1.SourceSpec{Sink: sinkDest},
}),
rttestingv1.WithApiServerSourceUID(sourceUID),
rttestingv1.WithApiServerSourceObjectMetaGeneration(generation),
),
rttestingv1.NewChannel(sinkName, testNS,
rttestingv1.WithInitChannelConditions,
rttestingv1.WithChannelAddress(sinkDNS),
),
makeAvailableReceiveAdapterWithEventMode(t, sourcesv1.ResourceMode),
},
Key: testNS + "/" + sourceName,
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Resources: []sourcesv1.APIVersionKindSelector{{
APIVersion: "v1",
Kind: "Namespace",
}},
EventMode: sourcesv1.ResourceMode,
SourceSpec: duckv1.SourceSpec{Sink: sinkDest},
}),
rttestingv1.WithApiServerSourceUID(sourceUID),
rttestingv1.WithApiServerSourceObjectMetaGeneration(generation),
// Status Update:
rttestingv1.WithInitApiServerSourceConditions,
rttestingv1.WithApiServerSourceDeployed,
rttestingv1.WithApiServerSourceSink(sinkURI),
rttestingv1.WithApiServerSourceSufficientPermissions,
rttestingv1.WithApiServerSourceResourceModeEventTypes(source),
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
),
}},
Expand Down Expand Up @@ -232,7 +282,7 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithApiServerSourceDeployed,
rttestingv1.WithApiServerSourceSink(sinkURI),
rttestingv1.WithApiServerSourceSufficientPermissions,
rttestingv1.WithApiServerSourceEventTypes(source),
rttestingv1.WithApiServerSourceReferenceModeEventTypes(source),
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
),
}},
Expand Down Expand Up @@ -290,7 +340,7 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithApiServerSourceDeployed,
rttestingv1.WithApiServerSourceSink(sinkTargetURI),
rttestingv1.WithApiServerSourceSufficientPermissions,
rttestingv1.WithApiServerSourceEventTypes(source),
rttestingv1.WithApiServerSourceReferenceModeEventTypes(source),
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
),
}},
Expand Down Expand Up @@ -340,7 +390,7 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithInitApiServerSourceConditions,
rttestingv1.WithApiServerSourceSink(sinkURI),
rttestingv1.WithApiServerSourceSufficientPermissions,
rttestingv1.WithApiServerSourceEventTypes(source),
rttestingv1.WithApiServerSourceReferenceModeEventTypes(source),
rttestingv1.WithApiServerSourceDeploymentUnavailable,
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
),
Expand Down Expand Up @@ -401,7 +451,7 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithApiServerSourceDeploymentUnavailable,
rttestingv1.WithApiServerSourceSink(sinkURI),
rttestingv1.WithApiServerSourceSufficientPermissions,
rttestingv1.WithApiServerSourceEventTypes(source),
rttestingv1.WithApiServerSourceReferenceModeEventTypes(source),
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
),
}},
Expand Down Expand Up @@ -455,7 +505,7 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithApiServerSourceDeploymentUnavailable,
rttestingv1.WithApiServerSourceSink(sinkURI),
rttestingv1.WithApiServerSourceSufficientPermissions,
rttestingv1.WithApiServerSourceEventTypes(source),
rttestingv1.WithApiServerSourceReferenceModeEventTypes(source),
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
),
}},
Expand Down Expand Up @@ -506,7 +556,7 @@ func TestReconcile(t *testing.T) {
rttestingv1.WithApiServerSourceDeployed,
rttestingv1.WithApiServerSourceSink(sinkURI),
rttestingv1.WithApiServerSourceSufficientPermissions,
rttestingv1.WithApiServerSourceEventTypes(source),
rttestingv1.WithApiServerSourceReferenceModeEventTypes(source),
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
),
}},
Expand Down Expand Up @@ -613,6 +663,40 @@ func makeAvailableReceiveAdapterWithTargetURI(t *testing.T) *appsv1.Deployment {
return ra
}

func makeAvailableReceiveAdapterWithEventMode(t *testing.T, eventMode string) *appsv1.Deployment {
t.Helper()

src := rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Resources: []sourcesv1.APIVersionKindSelector{{
APIVersion: "v1",
Kind: "Namespace",
}},
EventMode: eventMode,
SourceSpec: duckv1.SourceSpec{Sink: sinkDest},
}),
rttestingv1.WithApiServerSourceUID(sourceUID),
// Status Update:
rttestingv1.WithInitApiServerSourceConditions,
rttestingv1.WithApiServerSourceDeployed,
rttestingv1.WithApiServerSourceSink(sinkURI),
)

args := resources.ReceiveAdapterArgs{
Image: image,
Source: src,
Labels: resources.Labels(sourceName),
SinkURI: sinkURI.String(),
Configs: &reconcilersource.EmptyVarsGenerator{},
}

ra, err := resources.MakeReceiveAdapter(&args)
require.NoError(t, err)

rttesting.WithDeploymentAvailable()(ra)
return ra
}

func makeReceiveAdapterWithDifferentEnv(t *testing.T) *appsv1.Deployment {
ra := makeReceiveAdapter(t)
ra.Spec.Template.Spec.Containers[0].Env = append(ra.Spec.Template.Spec.Containers[0].Env, corev1.EnvVar{
Expand Down
1 change: 0 additions & 1 deletion pkg/reconciler/mtbroker/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ const (
subscriptionDeleteFailed = "SubscriptionDeleteFailed"
subscriptionCreateFailed = "SubscriptionCreateFailed"
subscriptionGetFailed = "SubscriptionGetFailed"
subscriptionDeleted = "SubscriptionDeleted"
)

type Reconciler struct {
Expand Down
19 changes: 16 additions & 3 deletions pkg/reconciler/testing/v1/apiserversouce.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,23 @@ func WithApiServerSourceDeployed(s *v1.ApiServerSource) {
s.Status.PropagateDeploymentAvailability(testing.NewDeployment("any", "any", testing.WithDeploymentAvailable()))
}

func WithApiServerSourceEventTypes(source string) ApiServerSourceOption {
func WithApiServerSourceReferenceModeEventTypes(source string) ApiServerSourceOption {
return func(s *v1.ApiServerSource) {
ceAttributes := make([]duckv1.CloudEventAttributes, 0, len(apisources.ApiServerSourceEventTypes))
for _, apiServerSourceType := range apisources.ApiServerSourceEventTypes {
ceAttributes := make([]duckv1.CloudEventAttributes, 0, len(apisources.ApiServerSourceEventReferenceModeTypes))
for _, apiServerSourceType := range apisources.ApiServerSourceEventReferenceModeTypes {
ceAttributes = append(ceAttributes, duckv1.CloudEventAttributes{
Type: apiServerSourceType,
Source: source,
})
}
s.Status.CloudEventAttributes = ceAttributes
}
}

func WithApiServerSourceResourceModeEventTypes(source string) ApiServerSourceOption {
return func(s *v1.ApiServerSource) {
ceAttributes := make([]duckv1.CloudEventAttributes, 0, len(apisources.ApiServerSourceEventResourceModeTypes))
for _, apiServerSourceType := range apisources.ApiServerSourceEventResourceModeTypes {
ceAttributes = append(ceAttributes, duckv1.CloudEventAttributes{
Type: apiServerSourceType,
Source: source,
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/source_api_server_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,14 +247,14 @@ func TestApiServerSourceV1EventTypes(t *testing.T) {
// wait for all test resources to be ready
client.WaitForAllTestResourcesReadyOrFail(ctx)

eventTypes, err := waitForEventTypes(ctx, client, len(sources.ApiServerSourceEventTypes))
eventTypes, err := waitForEventTypes(ctx, client, len(sources.ApiServerSourceEventReferenceModeTypes))
if err != nil {
t.Fatalf("Waiting for EventTypes: %v", err)
}
expectedCeTypes := sets.NewString(sources.ApiServerSourceEventTypes...)
expectedCeTypes := sets.NewString(sources.ApiServerSourceEventReferenceModeTypes...)
for _, et := range eventTypes {
if !expectedCeTypes.Has(et.Spec.Type) {
t.Fatalf("Invalid spec.type for ApiServerSource EventType, expected one of: %v, got: %s", sources.ApiServerSourceEventTypes, et.Spec.Type)
t.Fatalf("Invalid spec.type for ApiServerSource EventType, expected one of: %v, got: %s", sources.ApiServerSourceEventReferenceModeTypes, et.Spec.Type)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/source_api_server_v1alpha2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,14 @@ func TestApiServerSourceV1Alpha2EventTypes(t *testing.T) {
client.WaitForAllTestResourcesReadyOrFail(ctx)

// Verify that EventTypes were created.
eventTypes, err := waitForEventTypes(ctx, client, len(sources.ApiServerSourceEventTypes))
eventTypes, err := waitForEventTypes(ctx, client, len(sources.ApiServerSourceEventReferenceModeTypes))
if err != nil {
t.Fatalf("Waiting for EventTypes: %v", err)
}
expectedCeTypes := sets.NewString(sources.ApiServerSourceEventTypes...)
expectedCeTypes := sets.NewString(sources.ApiServerSourceEventReferenceModeTypes...)
for _, et := range eventTypes {
if !expectedCeTypes.Has(et.Spec.Type) {
t.Fatalf("Invalid spec.type for ApiServerSource EventType, expected one of: %v, got: %s", sources.ApiServerSourceEventTypes, et.Spec.Type)
t.Fatalf("Invalid spec.type for ApiServerSource EventType, expected one of: %v, got: %s", sources.ApiServerSourceEventReferenceModeTypes, et.Spec.Type)
}
}
}
6 changes: 3 additions & 3 deletions test/e2e/source_api_server_v1beta1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,14 +243,14 @@ func TestApiServerSourceV1Beta1EventTypes(t *testing.T) {
// wait for all test resources to be ready
client.WaitForAllTestResourcesReadyOrFail(ctx)

eventTypes, err := waitForEventTypes(ctx, client, len(sources.ApiServerSourceEventTypes))
eventTypes, err := waitForEventTypes(ctx, client, len(sources.ApiServerSourceEventReferenceModeTypes))
if err != nil {
t.Fatalf("Waiting for EventTypes: %v", err)
}
expectedCeTypes := sets.NewString(sources.ApiServerSourceEventTypes...)
expectedCeTypes := sets.NewString(sources.ApiServerSourceEventReferenceModeTypes...)
for _, et := range eventTypes {
if !expectedCeTypes.Has(et.Spec.Type) {
t.Fatalf("Invalid spec.type for ApiServerSource EventType, expected one of: %v, got: %s", sources.ApiServerSourceEventTypes, et.Spec.Type)
t.Fatalf("Invalid spec.type for ApiServerSource EventType, expected one of: %v, got: %s", sources.ApiServerSourceEventReferenceModeTypes, et.Spec.Type)
}
}
}