Skip to content

refactor in-memory channel to support sync and async handlers#708

Merged
knative-prow-robot merged 7 commits into
knative:masterfrom
sbezverk:async
Feb 4, 2019
Merged

refactor in-memory channel to support sync and async handlers#708
knative-prow-robot merged 7 commits into
knative:masterfrom
sbezverk:async

Conversation

@sbezverk
Copy link
Copy Markdown
Contributor

@sbezverk sbezverk commented Jan 9, 2019

Signed-off-by: Serguei Bezverkhi sbezverk@cisco.com

This PR add support of async way of calling fanout handler and refactors in-memory-channel to support for both types, sync(old) and async(new)

@knative-prow-robot knative-prow-robot added the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Jan 9, 2019
@knative-prow-robot knative-prow-robot added the size/L Denotes a PR that changes 100-499 lines, ignoring generated files. label Jan 9, 2019
@googlebot googlebot added the cla: yes Indicates the PR's author has signed the CLA. label Jan 9, 2019
@sbezverk
Copy link
Copy Markdown
Contributor Author

sbezverk commented Jan 9, 2019

/assign @Harwayne

Copy link
Copy Markdown
Contributor

@Harwayne Harwayne left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a great start.

return err
}
config := multiChannelFanoutConfig(channels)
// Temporary adding logger
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended to be committed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a temporary, as I am debugging Async e2e case failure, will be removed before the final commit.

Comment thread pkg/sidecar/fanout/fanout_handler.go Outdated
// Configuration for a fanout.Handler.
type Config struct {
Subscriptions []eventingduck.ChannelSubscriberSpec `json:"subscriptions"`
AsyncHandler bool `json:"asyncHandler"`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference is Async, rather than AsyncHandler.

It should probably omit on empty as well.

Async bool `json:"async,omitempty"`

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change but just Async sounds a bit too ambiguous, where AsyncHandler clearly states what that is. Please let me know if you still want me to change it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a minor preference for async, but I can see why you prefer asyncHandler. Unless someone else chimes in, leave it as-is.

Comment thread pkg/sidecar/fanout/fanout_handler.go Outdated
// the `sink` portions of the subscription.
func (f *Handler) makeFanoutRequest(m provisioners.Message, sub eventingduck.ChannelSubscriberSpec) error {
if f.config.AsyncHandler {
f.logger.Info("><SB> Exercising ASYNC path in Fanout Request for", zap.Any("m = ", m), zap.Any("sub = ", sub))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably shouldn't be committed. If is committed, should be Debug level.

Also, zap.Any("m", m) is sufficient (as compared to "m = "), because the string is used as a key in the printed JSON log.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will be removed completely after debugging is concluded

Comment thread pkg/sidecar/fanout/fanout_handler.go Outdated
// Configuration for a fanout.Handler.
type Config struct {
Subscriptions []eventingduck.ChannelSubscriberSpec `json:"subscriptions"`
AsyncHandler bool `json:"asyncHandler"`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add documentation, something like:

// AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously. It is expected to be false when used as a sidecar.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

if c.Spec.Subscribable != nil {
// TODO After in-memory-channel is retired, this logic must be refactored.
asyncHandler := false
if c.Spec.Provisioner.Name == asyncProvisionerName {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a unit test to verify the state of asyncHandler.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do.

@sbezverk sbezverk changed the title WIP refactor in-memory channel to support sync and async handlers refactor in-memory channel to support sync and async handlers Jan 11, 2019
@knative-prow-robot knative-prow-robot removed the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label Jan 11, 2019
Comment thread pkg/sidecar/fanout/fanout_handler.go Outdated
// the `sink` portions of the subscription.
func (f *Handler) makeFanoutRequest(m provisioners.Message, sub eventingduck.ChannelSubscriberSpec) error {
if f.config.AsyncHandler {
go f.dispatcher.DispatchMessage(&m, sub.SubscriberURI, sub.ReplyURI, provisioners.DispatchDefaults{})
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we will drop all errors with this. I think you could wrap in an anon function to log the error at least:

go func () {
  if err := f.dispatcher....; err != nil {
    log err
  } 
  
}

@knative-prow-robot knative-prow-robot added size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. and removed size/L Denotes a PR that changes 100-499 lines, ignoring generated files. labels Jan 28, 2019
Comment thread .gitattributes

**/zz_generated.*.go linguist-generated=true
/pkg/client/** linguist-generated=true
/contrib/*/pkg/client/** linguist-generated=true
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please ignore, merging different branches issue. Just resolved now.

@n3wscott
Copy link
Copy Markdown
Contributor

It looks like this PR needs to be rebased on master, it contains a lot of commits that are not related

Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
@knative-prow-robot knative-prow-robot added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. and removed size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. labels Jan 28, 2019
Copy link
Copy Markdown
Contributor

@Harwayne Harwayne left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good in general, some small comments before I LGTM.

However, we need to make sure to test this manually before submitting it. Our e2e tests should be good enough that we can rely on them, but I don't feel that is true right now.

Comment thread test/e2e-tests.sh Outdated
echo "========== Collecting logs for pod: "${pod}" in $namespace ================="
for container in $(kubectl get pod "${pod}" -n $namespace -ojsonpath='{.spec.containers[*].name}'); do
echo "----------------------------------------------------------"
echo "Container: "${container}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference is to include all the pieces on one line (preferably in an easily searchable way). Something like:

echo "Namespace, Pod, Container: ${namespace}, ${pod}, ${container}"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment thread test/e2e-tests.sh
echo "----------------------------------------------------------"
echo "Container: "${container}
kubectl logs -n $namespace "${pod}" -c "${container}" || true
echo "----------------------------------------------------------"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also write out that this is a previous incarnation of a Pod. Something like:

echo "Namespace, Pod, Container -- Previous Instance: ${namespace}, ${pod}, ${container}"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


func createReceiverFunction(f *Handler) func(provisioners.ChannelReference, *provisioners.Message) error {
return func(_ provisioners.ChannelReference, m *provisioners.Message) error {
if f.config.AsyncHandler {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit test.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modified existing unit test by adding one test case where AsyncHandler is set true. Let me know if you think it is sufficient.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think anything has been pushed.

logger.Info("Error creating the Virtual Service for the Channel", zap.Error(err))
r.recorder.Eventf(c, corev1.EventTypeWarning, virtualServiceCreateFailed, "Failed to reconcile Virtual Service for the Channel: %v", err)
return err
if c.Spec.Provisioner.Name == asyncProvisionerName {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are both paths of this 'if' statement unit tested? If not, do so.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this still isn't unit tested.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but when I look at the code, in both cases, both branches of if, we create exactly the same virtual service (unless I was reading code incorrectly), even if a different from in-memory provisioner name is used we overwrite it to in-memory. So I am not sure what to test here, the only I could think of is if with a different than in-memory provisioner name, but that would test only golang's if. WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want the unit test to test that the created VirtualService has the same destination regardless of whether or not in-memory or in-memory-channel are used.

As it is, I don't think we have any unit tests for the new in-memory behavior. As that is the provisioner we want going forward, I would like it tested. And once that is tested, I want to also test that in-memory-channel behaves correctly, namely that the created VirtualService points at the correct place.

Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
@sbezverk
Copy link
Copy Markdown
Contributor Author

@Harwayne sorry, family emergency did not have time to push actual change.

@Harwayne
Copy link
Copy Markdown
Contributor

@Harwayne sorry, family emergency did not have time to push actual change.

No problem, I've definitely forgotten to push in the past and wanted to let you know why the review wasn't going to move forward. Thanks for pushing, looking now.

Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
},
Mocks: controllertesting.Mocks{},
WantPresent: []runtime.Object{
makeVirtualService(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also makeK8sService("in-memory")

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

},
Mocks: controllertesting.Mocks{},
WantPresent: []runtime.Object{
makeVirtualService(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also makeK8sService("in-memory")

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I think this should have been makeK8sService("in-memory-channel"). Sorry for the confusion.


func makeChannel() *eventingv1alpha1.Channel {
func makeChannel(pn ...string) *eventingv1alpha1.Channel {
provisionerName := ccpName
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clever way to default. Move it to a separate function and add a comment saying what you are doing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
svc, err := util.CreateK8sService(ctx, r.client, c)
cCopy := c.DeepCopy()
cCopy.Spec.Provisioner.Name = asyncProvisionerName
svc, err := util.CreateK8sService(ctx, r.client, cCopy)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we always using in-memory here? Is it only because of my comment in the tests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, fixed it

},
Mocks: controllertesting.Mocks{},
WantPresent: []runtime.Object{
makeVirtualService(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I think this should have been makeK8sService("in-memory-channel"). Sorry for the confusion.

Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
Copy link
Copy Markdown
Contributor

@Harwayne Harwayne left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm
/hold

Please manually test in your cluster to see that both in-memory and in-memory-channel Channels work simultaneously. Once tested, feel free to cancel the hold yourself.

Also test the new in-memory with the e2e test:

export CLUSTER=<entry from ~/.kube/config file>
export NAMESPACE=<something random>

# This creates all the images. Without it, I was either seeing images that couldn't be pulled or stale images.
test/upload-test-images.sh e2e

# Actually running the tests.
go test -v -tags=e2e -count=1 ./test/e2e -dockerrepo=gcr.io/$PROJECT_ID -cluster=$CLUSTER -namespace=$NAMESPACE -clusterChannelProvisioner=in-memory

@knative-prow-robot knative-prow-robot added the lgtm Indicates that a PR is ready to be merged. label Feb 1, 2019
@knative-prow-robot knative-prow-robot added the do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. label Feb 1, 2019
@Harwayne
Copy link
Copy Markdown
Contributor

Harwayne commented Feb 1, 2019

/assign @grantr

Copy link
Copy Markdown
Contributor

@grantr grantr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for seeing this through @sbezverk and @Harwayne!

}

func makeK8sService() *corev1.Service {
func makeK8sService(pn ...string) *corev1.Service {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion for a different PR: Seems like this could benefit from the builder pattern for making fixtures with varying aspects. That might make it more clear to the reader what these arguments mean. See builder.go and SubscriptionBuilder.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing out.

@knative-prow-robot
Copy link
Copy Markdown
Contributor

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: grantr, sbezverk

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@knative-prow-robot knative-prow-robot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Feb 1, 2019
@sbezverk
Copy link
Copy Markdown
Contributor Author

sbezverk commented Feb 2, 2019

@Harwayne Interesting problem :)
when old in memory channel type is used, we create k8s service with matching name knative-eventing in-memory-channel-dispatcher, but VirtualService uses:

  spec:
    hosts:
    - k8s-events-channel-s82gp.default.svc.sbezverk.cisco.com
    - k8s-events.default.channels.sbezverk.cisco.com
    http:
    - rewrite:
        authority: k8s-events.default.channels.sbezverk.cisco.com
      route:
      - destination:
          host: in-memory-dispatcher.knative-eventing.svc.sbezverk.cisco.com    <----- !!!!!!!
          port:
            number: 80
        weight: 0

VirtualService uses in destination new in-memory. That makes a breakage in istio. I checked route entries and it points to a non existing in-memory-dispatcher.knative-eventing.svc.sbezverk.cisco.com. I am afraid we might need to re-think this solution.

/hold

Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
@knative-prow-robot knative-prow-robot removed the lgtm Indicates that a PR is ready to be merged. label Feb 2, 2019
Copy link
Copy Markdown
Contributor

@Harwayne Harwayne left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm

@knative-prow-robot knative-prow-robot added the lgtm Indicates that a PR is ready to be merged. label Feb 2, 2019
@sbezverk
Copy link
Copy Markdown
Contributor Author

sbezverk commented Feb 2, 2019

/hold cancel

@knative-prow-robot knative-prow-robot removed the do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. label Feb 2, 2019
Signed-off-by: Serguei Bezverkhi <sbezverk@cisco.com>
@knative-prow-robot knative-prow-robot removed the lgtm Indicates that a PR is ready to be merged. label Feb 2, 2019
@knative-metrics-robot
Copy link
Copy Markdown

The following is the coverage report on pkg/.
Say /test pull-knative-eventing-go-coverage to re-run this coverage report

File Old Coverage New Coverage Delta
pkg/controller/eventing/inmemory/channel/reconcile.go 97.9% 94.4% -3.5
pkg/controller/eventing/inmemory/clusterchannelprovisioner/reconcile.go 93.1% 93.4% 0.3

@sbezverk
Copy link
Copy Markdown
Contributor Author

sbezverk commented Feb 2, 2019

@Harwayne please lgtm, it was lost due to one final tiny change,

selector:
matchLabels: &labels
clusterChannelProvisioner: in-memory
clusterChannelProvisioner: in-memory-channel
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI was failing e2e test

Copy link
Copy Markdown
Contributor

@Harwayne Harwayne left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/lgtm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved Indicates a PR has been approved by an approver from all required OWNERS files. cla: yes Indicates the PR's author has signed the CLA. lgtm Indicates that a PR is ready to be merged. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants