Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config/provisioners/in-memory-channel/in-memory-channel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: eventing.knative.dev/v1alpha1
kind: ClusterChannelProvisioner
metadata:
name: in-memory
spec: {}

---

apiVersion: eventing.knative.dev/v1alpha1
kind: ClusterChannelProvisioner
metadata:
Expand Down
35 changes: 28 additions & 7 deletions pkg/controller/eventing/inmemory/channel/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,14 @@ import (

const (
finalizerName = controllerAgentName

// Name of the corev1.Events emitted from the reconciliation process
channelReconciled = "ChannelReconciled"
channelUpdateStatusFailed = "ChannelUpdateStatusFailed"
channelConfigSyncFailed = "ChannelConfigSyncFailed"
k8sServiceCreateFailed = "K8sServiceCreateFailed"
virtualServiceCreateFailed = "VirtualServiceCreateFailed"
// TODO after in-memory-channel is retired, asyncProvisionerName should be removed
defaultProvisionerName = "in-memory-channel"
)

type reconciler struct {
Expand Down Expand Up @@ -110,7 +111,6 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
r.recorder.Eventf(c, corev1.EventTypeWarning, channelUpdateStatusFailed, "Failed to update Channel's status: %v", err)
return reconcile.Result{}, updateStatusErr
}

return reconcile.Result{}, err
}

Expand Down Expand Up @@ -157,11 +157,26 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel)
}
c.Status.SetAddress(controller.ServiceHostName(svc.Name, svc.Namespace))

_, err = util.CreateVirtualService(ctx, r.client, c, svc)
if err != nil {
logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err))
r.recorder.Eventf(c, corev1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err)
return err
if c.Spec.Provisioner.Name == defaultProvisionerName {
_, err = util.CreateVirtualService(ctx, r.client, c, svc)
if err != nil {
logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err))
r.recorder.Eventf(c, corev1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err)
return err
}
} else {
// We need to have a single dispatcher that is pointed at by _both_
// ClusterChannelProvisioners. So fake the channel, by saying that its provisioner is the
// one with the single dispatcher. The faked provisioner is used only to determine the
// dispatcher Service's name.
cCopy := c.DeepCopy()
cCopy.Spec.Provisioner.Name = defaultProvisionerName
_, err = util.CreateVirtualService(ctx, r.client, cCopy, svc)
if err != nil {
logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err))
r.recorder.Eventf(c, corev1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err)
return err
}
}

c.Status.MarkProvisioned()
Expand Down Expand Up @@ -225,8 +240,14 @@ func multiChannelFanoutConfig(channels []eventingv1alpha1.Channel) *multichannel
Name: c.Name,
}
if c.Spec.Subscribable != nil {
// TODO After in-memory-channel is retired, this logic must be refactored.
asyncHandler := false
if c.Spec.Provisioner.Name != defaultProvisionerName {
asyncHandler = true
}
channelConfig.FanoutConfig = fanout.Config{
Subscriptions: c.Spec.Subscribable.Subscribers,
AsyncHandler: asyncHandler,
}
}
cc = append(cc, channelConfig)
Expand Down
55 changes: 49 additions & 6 deletions pkg/controller/eventing/inmemory/channel/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import (
)

const (
ccpName = "in-memory-channel"
ccpName = "in-memory-channel"
asyncCCPName = "in-memory"

cNamespace = "test-namespace"
cName = "test-channel"
Expand Down Expand Up @@ -518,6 +519,38 @@ func TestReconcile(t *testing.T) {
events[channelReconciled],
},
},
{
Name: "Channel reconcile successful - Async channel",
// VirtualService should have channel provisioner name
// defaults to in-memory-channel but the service should match provisioner's service name
InitialState: []runtime.Object{
makeChannel("in-memory"),
},
Mocks: controllertesting.Mocks{},
WantPresent: []runtime.Object{
makeVirtualService(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also makeK8sService("in-memory")

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

makeK8sService("in-memory"),
},
WantEvent: []corev1.Event{
events[channelReconciled],
},
},
{
Name: "Channel reconcile successful - Non Async channel",
// VirtualService should have channel provisioner name
// defaults to in-memory-channel
InitialState: []runtime.Object{
makeChannel(),
},
Mocks: controllertesting.Mocks{},
WantPresent: []runtime.Object{
makeVirtualService(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also makeK8sService("in-memory")

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I think this should have been makeK8sService("in-memory-channel"). Sorry for the confusion.

makeK8sService(),
},
WantEvent: []corev1.Event{
events[channelReconciled],
},
},
}

for _, tc := range testCases {
Expand All @@ -541,7 +574,7 @@ func TestReconcile(t *testing.T) {
}
}

func makeChannel() *eventingv1alpha1.Channel {
func makeChannel(pn ...string) *eventingv1alpha1.Channel {
c := &eventingv1alpha1.Channel{
TypeMeta: metav1.TypeMeta{
APIVersion: eventingv1alpha1.SchemeGroupVersion.String(),
Expand All @@ -554,14 +587,24 @@ func makeChannel() *eventingv1alpha1.Channel {
},
Spec: eventingv1alpha1.ChannelSpec{
Provisioner: &corev1.ObjectReference{
Name: ccpName,
Name: getProvisionerName(pn),
},
},
}
c.Status.InitializeConditions()
return c
}

// getProvisionerName returns either default provisioner name defined by ccpName variable
// or, if specified, a custom provisioner name.
func getProvisionerName(pn []string) string {
provisionerName := ccpName
if len(pn) != 0 {
provisionerName = pn[0]
}
return provisionerName
}

func makeChannelWithFinalizerAndAddress() *eventingv1alpha1.Channel {
c := makeChannelWithFinalizer()
c.Status.SetAddress(serviceAddress)
Expand Down Expand Up @@ -631,7 +674,7 @@ func makeConfigMapWithVerifyConfigMapData() *corev1.ConfigMap {
return cm
}

func makeK8sService() *corev1.Service {
func makeK8sService(pn ...string) *corev1.Service {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion for a different PR: Seems like this could benefit from the builder pattern for making fixtures with varying aspects. That might make it more clear to the reader what these arguments mean. See builder.go and SubscriptionBuilder.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing out.

return &corev1.Service{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Expand All @@ -643,8 +686,8 @@ func makeK8sService() *corev1.Service {
Labels: map[string]string{
util.EventingChannelLabel: cName,
util.OldEventingChannelLabel: cName,
util.EventingProvisionerLabel: ccpName,
util.OldEventingProvisionerLabel: ccpName,
util.EventingProvisionerLabel: getProvisionerName(pn),
util.OldEventingProvisionerLabel: getProvisionerName(pn),
},
OwnerReferences: []metav1.OwnerReference{
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ import (
)

const (
// Name is the name of the in-memory channel ClusterChannelProvisioner.
Name = "in-memory-channel"

// Channel is the name of the Channel resource in eventing.knative.dev/v1alpha1.
Channel = "Channel"

Expand All @@ -48,6 +45,11 @@ const (
k8sServiceDeleteFailed = "K8sServiceDeleteFailed"
)

var (
// provisionerNames contains the list of provisioners' names served by this controller
provisionerNames = []string{"in-memory-channel", "in-memory"}
)

type reconciler struct {
client client.Client
recorder record.EventRecorder
Expand Down Expand Up @@ -130,7 +132,12 @@ func IsControlled(ref *corev1.ObjectReference) bool {
// shouldReconcile determines if this Controller should control (and therefore reconcile) a given
// ClusterChannelProvisioner. This Controller only handles in-memory channels.
func shouldReconcile(namespace, name string) bool {
return namespace == "" && name == Name
for _, p := range provisionerNames {
if namespace == "" && name == p {
return true
}
}
return false
}

func (r *reconciler) reconcile(ctx context.Context, ccp *eventingv1alpha1.ClusterChannelProvisioner) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
ccpUid = "test-uid"
testErrorMessage = "test-induced-error"
testNS = "test-ns"
Name = "in-memory-channel"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/eventing/inmemory/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func main() {
logger := provisioners.NewProvisionerLoggerFromConfig(logConfig)
defer logger.Sync()
logger = logger.With(
zap.String("eventing.knative.dev/clusterChannelProvisioner", clusterchannelprovisioner.Name),
zap.String("eventing.knative.dev/clusterChannelProvisioner", "in-memory"),
zap.String("eventing.knative.dev/clusterChannelProvisionerComponent", "Controller"),
)
flag.Parse()
Expand Down
10 changes: 10 additions & 0 deletions pkg/sidecar/fanout/fanout_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const (
// Configuration for a fanout.Handler.
type Config struct {
Subscriptions []eventingduck.ChannelSubscriberSpec `json:"subscriptions"`
// AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously.
// It is expected to be false when used as a sidecar.
AsyncHandler bool `json:"asyncHandler,omitempty"`
}

// http.Handler that takes a single request in and fans it out to N other servers.
Expand Down Expand Up @@ -83,6 +86,13 @@ func NewHandler(logger *zap.Logger, config Config) *Handler {

func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error {
return func(_ provisioners.ChannelReference, m *provisioners.Message) error {
if f.config.AsyncHandler {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified existing unit test by adding one test case where AsyncHandler is set true. Let me know if you think it is sufficient.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think anything has been pushed.

go func() {
// Any returned error is already logged in f.dispatch().
_ = f.dispatch(m)
}()
return nil
}
return f.dispatch(m)
}
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/sidecar/fanout/fanout_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) {
subscriber func(http.ResponseWriter, *http.Request)
channel func(http.ResponseWriter, *http.Request)
expectedStatus int
asyncHandler bool
}{
"rejected by receiver": {
receiverFunc: func(provisioners.ChannelReference, *provisioners.Message) error {
Expand Down Expand Up @@ -178,6 +179,28 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) {
},
expectedStatus: http.StatusAccepted,
},
"all subs succeed with async handler": {
subs: []eventingduck.ChannelSubscriberSpec{
{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceChannel,
},
{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceChannel,
},
{
SubscriberURI: replaceSubscriber,
ReplyURI: replaceChannel,
},
},
subscriber: callableSucceed,
channel: func(writer http.ResponseWriter, _ *http.Request) {
writer.WriteHeader(http.StatusAccepted)
},
expectedStatus: http.StatusAccepted,
asyncHandler: true,
},
}
for n, tc := range testCases {
if n != "fanout times out" {
Expand Down Expand Up @@ -206,6 +229,9 @@ func TestFanoutHandler_ServeHTTP(t *testing.T) {
}

h := NewHandler(zap.NewNop(), Config{Subscriptions: subs})
if tc.asyncHandler {
h.config.AsyncHandler = true
}
if tc.receiverFunc != nil {
h.receiver = provisioners.NewMessageReceiver(tc.receiverFunc, zap.NewNop().Sugar())
}
Expand Down
20 changes: 19 additions & 1 deletion test/e2e-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,24 @@ $(dirname $0)/upload-test-images.sh e2e || fail_test "Error uploading test image
# Setup resources common to all eventing tests
setup_events_test_resources|| fail_test "Error setting up test resources"

go_test_e2e ./test/e2e || fail_test
go_test_e2e ./test/e2e
exit_result=$?
if [ ${exit_result} -ne 0 ]; then
# Collecting logs from all knative's eventing pods
echo "============================================================"
for namespace in "knative-eventing" "e2etestfn3"; do
for pod in $(kubectl get pod -n $namespace | grep Running | awk '{print $1}' ); do
for container in $(kubectl get pod "${pod}" -n $namespace -ojsonpath='{.spec.containers[*].name}'); do
echo "Namespace, Pod, Container: ${namespace}, ${pod}, ${container}"
kubectl logs -n $namespace "${pod}" -c "${container}" || true
echo "----------------------------------------------------------"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also write out that this is a previous incarnation of a Pod. Something like:

echo "Namespace, Pod, Container -- Previous Instance: ${namespace}, ${pod}, ${container}"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

echo "Namespace, Pod, Container (Previous instance): ${namespace}, ${pod}, ${container}"
kubectl logs -p -n $namespace "${pod}" -c "${container}" || true
echo "============================================================"
done
done
done
fail_test
fi

success