Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1bb44c5
pipelines, first cut of types def, generated code
May 5, 2019
30453b4
checkpointing, beginning of pipeline reconciler, does not workie
May 7, 2019
97ae552
types into messaging + clients
May 15, 2019
f248e2d
cleanup
May 15, 2019
d52c7a6
pr feedback, simple proposal for ChannelCRD Specs
May 16, 2019
45c2bb2
validation tests, more unit tests, still mosdef wip
May 28, 2019
e79d6c8
use typemeta/objectmeta instead of object ref. matches podtemplatespe…
May 29, 2019
8ce3fde
latest pkg and move to new controller sig
May 29, 2019
6115fe0
use the new controller style
May 29, 2019
c8de5f4
nothing
May 29, 2019
b4b30ac
use namespace for dynamic client, duh
May 29, 2019
2b25f22
just a test to wire things together, creates IMC
May 29, 2019
a6cefac
reconcile subscription objects
May 29, 2019
4536614
check that the subscription gets created
May 30, 2019
6916782
add multistep pipeline test
May 30, 2019
beb2d0f
fix the sock
May 30, 2019
7c3b696
checkpoint before pulling in channelable duck
May 30, 2019
d1f9aeb
pulling in channelable
May 30, 2019
6f7f20d
update statuses + more tests
May 31, 2019
f4326a8
add lifecycle tests
Jun 3, 2019
3eeffe8
fix unit for reconciler (status)
Jun 3, 2019
f46ac9f
moar tests
Jun 3, 2019
336feb0
add reply-to as the last stage
Jun 4, 2019
16512f3
tests, wire up the reply
Jun 4, 2019
7278ea2
remove cruft, fixed TODOs
Jun 4, 2019
17fca8e
fill in addressable, watch subscriptions and channel CRD via tracker
Jun 5, 2019
6a51003
more tests explicitly for setAddress
Jun 5, 2019
9f39d0a
remove commented out cruft
Jun 5, 2019
a562380
address pr feedback. populate channelstatuses
Jun 6, 2019
d373826
run update-codegen
Jun 6, 2019
59f9b16
add a todo, return an error when create sub fails
Jun 6, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/knative/eventing/pkg/reconciler/channel"
"github.com/knative/eventing/pkg/reconciler/eventtype"
"github.com/knative/eventing/pkg/reconciler/namespace"
"github.com/knative/eventing/pkg/reconciler/pipeline"
"github.com/knative/eventing/pkg/reconciler/subscription"
"github.com/knative/eventing/pkg/reconciler/trigger"
"github.com/knative/pkg/configmap"
Expand Down Expand Up @@ -81,7 +82,7 @@ func main() {

logger.Info("Starting the controller")

const numControllers = 6
const numControllers = 7
cfg.QPS = numControllers * rest.DefaultQPS
cfg.Burst = numControllers * rest.DefaultBurst
opt := reconciler.NewOptionsOrDie(cfg, logger, stopCh)
Expand All @@ -97,6 +98,9 @@ func main() {
brokerInformer := eventingInformerFactory.Eventing().V1alpha1().Brokers()
eventTypeInformer := eventingInformerFactory.Eventing().V1alpha1().EventTypes()

// Messaging
pipelineInformer := eventingInformerFactory.Messaging().V1alpha1().Pipelines()

// Kube
serviceInformer := kubeInformerFactory.Core().V1().Services()
namespaceInformer := kubeInformerFactory.Core().V1().Namespaces()
Expand Down Expand Up @@ -165,6 +169,12 @@ func main() {
eventTypeInformer,
brokerInformer,
),
pipeline.NewController(
opt,
pipelineInformer,
addressableInformer,
subscriptionInformer,
),
}
// This line asserts at compile time that the length of controllers is equal to numControllers.
// It is based on https://go101.org/article/tips.html#assert-at-compile-time, which notes that
Expand All @@ -191,6 +201,7 @@ func main() {
subscriptionInformer.Informer(),
triggerInformer.Informer(),
eventTypeInformer.Informer(),
pipelineInformer.Informer(),
// Kube
configMapInformer.Informer(),
serviceInformer.Informer(),
Expand Down
10 changes: 10 additions & 0 deletions config/200-controller-clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ rules:
verbs:
- "update"

# Our own resources and statuses we care about.
- apiGroups:
- "messaging.knative.dev"
resources:
- "pipelines"
- "pipelines/status"
- "inmemorychannels"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will need to be an aggregated role. Similar to #1341, but include all permissions, not just read and update.

- "inmemorychannels/status"
verbs: *everything

# Source resources and statuses we care about.
- apiGroups:
- "sources.eventing.knative.dev"
Expand Down
96 changes: 96 additions & 0 deletions config/300-pipeline.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# Copyright 2019 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: pipelines.messaging.knative.dev
labels:
knative.dev/crd-install: "true"
spec:
group: messaging.knative.dev
version: v1alpha1
names:
kind: Pipeline
plural: pipelines
singular: pipeline
categories:
- all
- knative
- eventing
Comment thread
vaikas marked this conversation as resolved.
- messaging
scope: Namespaced
subresources:
status: {}
additionalPrinterColumns:
- name: Ready
type: string
JSONPath: ".status.conditions[?(@.type==\"Ready\")].status"
- name: Reason
type: string
JSONPath: ".status.conditions[?(@.type==\"Ready\")].reason"
- name: Age
type: date
JSONPath: .metadata.creationTimestamp
validation:
openAPIV3Schema:
properties:
spec:
required:
- steps
- channelTemplate
properties:
steps:
type: array
items:
type: object
properties:
dnsName:
type: string
minLength: 1
uri:
type: string
minLength: 1
ref:
type: object
required:
- apiVersion
- kind
- name
properties:
apiVersion:
type: string
minLength: 1
kind:
type: string
minLength: 1
name:
type: string
minLength: 1
namespace:
type: string
maxLength: 0
channelTemplate:
type: object
required:
- apiVersion
- kind
properties:
apiVersion:
type: string
minLength: 1
kind:
type: string
minLength: 1
spec:
type: object
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this get validated?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can't in the openapi doc. You could if you loaded the channel template crd. That can be done in the future.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, so it should fail now at CR creation time, so while we can't easily ( as Scott said) validate the target channel, at least when we try to create it will fail instead of previous provisioner model where the channel would get created and then fail at a later time. I'd like to not tackle this as part of this PR. One option would be to use dry run. Regardless, created this issue to track that, ok?
#1349

4 changes: 2 additions & 2 deletions pkg/apis/eventing/v1alpha1/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&ChannelList{},
&ClusterChannelProvisioner{},
&ClusterChannelProvisionerList{},
&EventType{},
&EventTypeList{},
&Subscription{},
&SubscriptionList{},
&Trigger{},
&TriggerList{},
&EventType{},
&EventTypeList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ func isChannelEmpty(f corev1.ObjectReference) bool {

// Valid if it is a valid object reference.
func isValidChannel(f corev1.ObjectReference) *apis.FieldError {
return isValidObjectReference(f)
return IsValidObjectReference(f)
}

func isValidObjectReference(f corev1.ObjectReference) *apis.FieldError {
func IsValidObjectReference(f corev1.ObjectReference) *apis.FieldError {
return checkRequiredObjectReferenceFields(f).
Also(checkDisallowedObjectReferenceFields(f))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestIsValidObjectReference(t *testing.T) {
for _, fe := range test.want {
allWanted = allWanted.Also(fe)
}
got := isValidObjectReference(test.ref)
got := IsValidObjectReference(test.ref)
if diff := cmp.Diff(allWanted.Error(), got.Error()); diff != "" {
t.Errorf("%s: validation (-want, +got) = %v", test.name, diff)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/apis/eventing/v1alpha1/subscription_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (ss *SubscriptionSpec) Validate(ctx context.Context) *apis.FieldError {
}

if !missingSubscriber {
if fe := isValidSubscriberSpec(*ss.Subscriber); fe != nil {
if fe := IsValidSubscriberSpec(*ss.Subscriber); fe != nil {
errs = errs.Also(fe.ViaField("subscriber"))
}
}
Expand All @@ -78,7 +78,7 @@ func isSubscriberSpecNilOrEmpty(s *SubscriberSpec) bool {
return false
}

func isValidSubscriberSpec(s SubscriberSpec) *apis.FieldError {
func IsValidSubscriberSpec(s SubscriberSpec) *apis.FieldError {
var errs *apis.FieldError

fieldsSet := make([]string, 0, 0)
Expand All @@ -99,7 +99,7 @@ func isValidSubscriberSpec(s SubscriberSpec) *apis.FieldError {

// If Ref given, check the fields.
if s.Ref != nil && !equality.Semantic.DeepEqual(s.Ref, &corev1.ObjectReference{}) {
fe := isValidObjectReference(*s.Ref)
fe := IsValidObjectReference(*s.Ref)
if fe != nil {
errs = errs.Also(fe.ViaField("ref"))
}
Expand All @@ -112,7 +112,7 @@ func isReplyStrategyNilOrEmpty(r *ReplyStrategy) bool {
}

func isValidReply(r ReplyStrategy) *apis.FieldError {
if fe := isValidObjectReference(*r.Channel); fe != nil {
if fe := IsValidObjectReference(*r.Channel); fe != nil {
return fe.ViaField("channel")
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/eventing/v1alpha1/subscription_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func TestValidgetValidSubscriber(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
got := isValidSubscriberSpec(test.s)
got := IsValidSubscriberSpec(test.s)
if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" {
t.Errorf("%s: isValidSubscriber (-want, +got) = %v", test.name, diff)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/eventing/v1alpha1/trigger_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (ts *TriggerSpec) Validate(ctx context.Context) *apis.FieldError {
if isSubscriberSpecNilOrEmpty(ts.Subscriber) {
fe := apis.ErrMissingField("subscriber")
errs = errs.Also(fe)
} else if fe := isValidSubscriberSpec(*ts.Subscriber); fe != nil {
} else if fe := IsValidSubscriberSpec(*ts.Subscriber); fe != nil {
errs = errs.Also(fe.ViaField("subscriber"))
}

Expand Down
27 changes: 27 additions & 0 deletions pkg/apis/messaging/v1alpha1/pipeline_defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import "context"

func (s *Pipeline) SetDefaults(ctx context.Context) {
s.Spec.SetDefaults(ctx)
}

func (ss *PipelineSpec) SetDefaults(ctx context.Context) {
// TODO anything?
}
Loading