[WIP] Add NATS Streaming as a new bus implementation#491
Conversation
devguyio
left a comment
There was a problem hiding this comment.
@radufa I aded a couple of cosmetic comments. Will test locally and provide any extra feedback.
| labels: | ||
| app: nats-streaming | ||
| spec: | ||
| # type: NodePort |
There was a problem hiding this comment.
Please remove commented line
| spec: | ||
| # type: NodePort | ||
| type: ClusterIP | ||
| # clusterIP: None |
There was a problem hiding this comment.
Please remove commented line
| 1. Apply the 'natss' Bus `ko apply -f config/buses/natss/` | ||
| 1. Create Channels that reference the 'natss' Bus | ||
|
|
||
| The NATSS-Streaming bus uses NATS-Streaming based on a simple setup, see [Natss Streaming](./100-natss.yaml) . |
There was a problem hiding this comment.
NATS Streaming without a hyphen.
| } | ||
| } | ||
| // subscribe to a NATSS subject | ||
| if natsStreamingSub, err := (*natsConn).Subscribe(channel.Name, mcb /*nmh.msgh*/, stan.DurableName(subscription.Name)); err != nil { |
There was a problem hiding this comment.
Remove the comment /*nmh.msgh*/
| apiVersion: v1 | ||
| kind: Service | ||
| metadata: | ||
| name: nats-streaming-headless |
There was a problem hiding this comment.
Minor naming comment: I suppose headless is used in case clusterIP:None. Please consider removing headless.
vaikas
left a comment
There was a problem hiding this comment.
I'll review rest later, just got through the yaml files.
| # configuration file used to override default NATS server settings | ||
| stan.conf: | | ||
| # content of configuration file used to override default NATS Streaming server settings | ||
| kind: ConfigMap |
There was a problem hiding this comment.
bit of a nit, could you organize this:
apiversion
kind
metadata
data
makes it easier to parse mentally :)
| args: | ||
| - -D | ||
| - -SD | ||
| - --cluster_id=knative-nats-streaming |
There was a problem hiding this comment.
is this something that needs to be possibly updated when deploying?
There was a problem hiding this comment.
Yes, this is a kind of a shared "security context" between all NATS Streaming servers running in a cluster and all their clients. Now it's only a simple deployment without clustering.
| - "ReadWriteOnce" | ||
| resources: | ||
| requests: | ||
| storage: "1Gi" |
There was a problem hiding this comment.
Is this for the persistence of the messages? If so, is there a way to get a rough idea how this might be tweaked depending on message rates / sizes, etc.?
There was a problem hiding this comment.
Yes, this is for persistence, for all messages... We have to improve the strategy for the persistence layer in the future, but it's enough for this simple example.
|
/retest |
1 similar comment
|
/retest |
|
/cc @adamharwayne |
| selector: | ||
| app: nats-streaming | ||
| sessionAffinity: None | ||
| selector: |
There was a problem hiding this comment.
selector is defined twice.
| kind: StatefulSet | ||
| metadata: | ||
| name: nats-streaming | ||
| namespace: default |
There was a problem hiding this comment.
Should this be in the knative-eventing namespace?
There was a problem hiding this comment.
it is deployed now in knative-eventing namespace
| requests: | ||
| cpu: "100m" | ||
| limits: | ||
| memory: "32M" |
There was a problem hiding this comment.
This seems low, is it enough?
There was a problem hiding this comment.
It's enough for this simple, not production ready, "bus" implementation. We will come later with a NATS Streaming cluster which will be deeper integrated into Eventing.
| } | ||
| // subscribe to a NATSS subject | ||
| if natsStreamingSub, err := (*natsConn).Subscribe(channel.Name, mcb, stan.DurableName(subscription.Name)); err != nil { | ||
| bus.logger.Errorf(" Create new NATSS Subscription failed: %+v", err) |
| // unsubscribe from a NATSS subject | ||
| if natsStreamingSub, ok := bus.subscribers[subscription.Name]; ok { | ||
| if err := (*natsStreamingSub).Unsubscribe(); err != nil { | ||
| bus.logger.Errorf(" Unsubscribe() failed: %+v", err) |
| ProvisionFunc: func(channel buses.ChannelReference, parameters buses.ResolvedParameters) error { | ||
| bus.logger.Infof("Provision channel %q\n", channel.Name) | ||
| bus.logger.Info("channel=%+v; parameters=%+v", channel, parameters) | ||
| // TODO create a NATSS subject |
There was a problem hiding this comment.
Is there anything to do? Aren't the subjects created with the subscriptions, rather than the bus itself? If so, I think we can simply not assign ProvisionFunc and UnprovisionFunc.
There was a problem hiding this comment.
It's only a place holder now, cause the creation of a NATS Streaming subjects will be validated in the future, avoiding automatically (uncontrolled) creation.
For this simple implementation, this code is not necessary now. A subject is created, if necessary, with the subscription.
I'll delete it, if it has an impact of the clarity of this example.
There was a problem hiding this comment.
My preference is to either fill out the TODO with what is actually planned or to remove them. But it is weakly held.
| } | ||
|
|
||
| // IsConnected .... | ||
| func IsConnected(sc *stan.Conn) (ok bool) { |
| const BusType = "natss" | ||
|
|
||
| type NatssBus struct { | ||
| natsConn *stan.Conn |
There was a problem hiding this comment.
I don't think this is used.
| // TODO delete a NATSS subject | ||
| return nil | ||
| }, | ||
| SubscribeFunc: func(channel buses.ChannelReference, subscription buses.SubscriptionReference, parameters buses.ResolvedParameters) error { |
There was a problem hiding this comment.
For ease of reading, can these be moved to functions on NatssBus?
SubscribeFunc: bus.subscribeFunc,
...
func (b *NatssBus) subscribeFunc(...)
| // TODO delete a NATSS subject | ||
| return nil | ||
| }, | ||
| SubscribeFunc: func(channel buses.ChannelReference, subscription buses.SubscriptionReference, parameters buses.ResolvedParameters) error { |
There was a problem hiding this comment.
I think SubscribeFunc and UnsubscribeFunc should be set in the Dispatcher, not the Provisioner.
|
/retest |
|
The last integration test failures were due to an unrelated issue impacting all PRs that has been fixed. Submitting this one for testing again. /test pull-knative-eventing-integration-tests |
|
@adamharwayne I have done the changes proposed by you. Please have a look. |
|
Should we add a new bus vs provisioner? |
|
@n3wscott Are you referring to the Provisioner part of the bus implementation (see in this case the discussion with @adamharwayne)? |
adamharwayne
left a comment
There was a problem hiding this comment.
Overall looks good to me. Please add some unit tests and I will be happy to approve.
| return err | ||
| } | ||
| delete(bus.subscribers, subscription.Name) | ||
| bus.logger.Errorf(" Unsubscribe() successful.") |
| } | ||
| } | ||
| // subscribe to a NATSS subject | ||
| aw, _ := time.ParseDuration("60s") |
| ProvisionFunc: func(channel buses.ChannelReference, parameters buses.ResolvedParameters) error { | ||
| bus.logger.Infof("Provision channel %q\n", channel.Name) | ||
| bus.logger.Info("channel=%+v; parameters=%+v", channel, parameters) | ||
| // TODO create a NATSS subject |
There was a problem hiding this comment.
My preference is to either fill out the TODO with what is actually planned or to remove them. But it is weakly held.
| * limitations under the License. | ||
| */ | ||
|
|
||
| package stanutil |
There was a problem hiding this comment.
Please add unit tests for this package.
There was a problem hiding this comment.
Done. The unit tests are using NATS Streaming embedded.
|
|
||
| import "testing" | ||
|
|
||
| func TestStub(t *testing.T) { |
There was a problem hiding this comment.
Please add in some tests.
There was a problem hiding this comment.
@adamharwayne the NATS Streaming server can be embedded inside of the test process to run unit tests again a real NATS Streaming server. This implies vendoring the NATS Streaming server code in Eventing.
Of course, mocking NATS Streaming could be another solution.
Cause this is the first bus implementation which should add tests, please advise.
There was a problem hiding this comment.
So far we haven't run the real implementations in unit tests, but it seems like a good idea as long as:
- It isn't flaky
- It doesn't take too long (upper bound of 1 minute to start with?).
I didn't realize that the other buses didn't have tests. If you don't vendor the NATS streaming server, then I would focus on testing the event handler functions (e.g. handles errors gracefully, only ACKs the message if the downstream acked, etc.).
There was a problem hiding this comment.
Thanks! I'll use the embedded NATS Streaming for the unit tests. It's stable and the startup time is aprox. 10s.
|
Thanks for your pull request. It looks like this may be your first contribution to a Google open source project (if not, look below for help). Before we can look at your pull request, you'll need to sign a Contributor License Agreement (CLA). 📝 Please visit https://cla.developers.google.com/ to sign. Once you've signed (or fixed any issues), please reply here (e.g. What to do if you already signed the CLAIndividual signers
Corporate signers
|
|
I signed it! |
|
CLAs look good, thanks! |
|
/cc @adamharwayne |
adamharwayne
left a comment
There was a problem hiding this comment.
Thanks for adding the tests! I suddenly see why none of our existing buses have tests...
| @@ -0,0 +1,11 @@ | |||
| # Stub - Knative Bus | |||
There was a problem hiding this comment.
| # Stub - Knative Bus | |
| # NATS Streaming - Knative Bus |
| ) | ||
|
|
||
| const ( | ||
| clusterId = "knative-eventing" |
| func TestConnectPublishClose(t *testing.T) { | ||
| // connect | ||
| natsConn, err := Connect(clusterId, clientId, natssUrl, logger) | ||
| assert.Nil(t, err) |
There was a problem hiding this comment.
I think our standard way to do this is:
if err != nil {
t.Error("Connect returned nil")
}Replacing the error message as needed.
There was a problem hiding this comment.
Done, also I've removed the "testify" from vendor
|
|
||
| func TestConnectPublishClose(t *testing.T) { | ||
| // connect | ||
| natsConn, err := Connect(clusterId, clientId, natssUrl, logger) |
There was a problem hiding this comment.
I would expect this to defer closing here (well, just after checking if err is not nil).
defer Close(natsConn, logger)| if err != nil { | ||
| panic(err) | ||
| } | ||
| retCode := m.Run() |
There was a problem hiding this comment.
I would expect this to defer stopNatss here, now that we know the err is nil.
defer stopNatss(stanServer)
retCode := m.Run()| busProv.Run(1, stopCh, "test-provisioner-natss") | ||
| busDisp.Run(1, stopCh, "test-dispatcher-natss") | ||
|
|
||
| // create adummy channel |
| // wait for subscriber to respond | ||
| select { | ||
| case <-done: logger.Infof("Subscriber finished") | ||
| case <-time.After(5 * time.Second): logger.Info("Subscriber timeout") |
There was a problem hiding this comment.
Shouldn't this fail the test?
| func SetNewBusProvisioner(opts *buses.BusOpts) SetupNatssBus { | ||
| return func(b *NatssBus) error { | ||
| b.natsUrl = NatssUrl | ||
| b.provisioner = buses.NewBusProvisioner(b.ref, b.dispEventHandler, opts) |
There was a problem hiding this comment.
b.dispEventHandler -> b.provEventHandler
| busDisp, err := NewNatssBusDispatcher(ref, setNewTestBusDispatcher(t, opts)) | ||
| assert.Nil(t, err) | ||
|
|
||
| stopCh := signals.SetupSignalHandler() |
There was a problem hiding this comment.
Should we defer closing/sending to this Channel?
|
|
||
| // wait for subscriber to respond | ||
| select { | ||
| case <-done: logger.Infof("Subscriber finished") |
There was a problem hiding this comment.
Check the message payload.
|
/retest |
|
/test pull-knative-eventing-unit-tests |
1 similar comment
|
/test pull-knative-eventing-unit-tests |
| ) | ||
|
|
||
| func TestMain(m *testing.M) { | ||
| logger = buses.NewBusLoggerFromConfig(buses.NewLoggingConfig()) |
There was a problem hiding this comment.
Does anything rely on the logs being written? E.g. something searches the log output to see if 'foobar' was written. Or does it just need a struct that has the right methods?
zap.NewNop() returns a valid logger, but the logs don't get written anywhere.
| ) | ||
|
|
||
| func TestMain(m *testing.M) { | ||
| logger = buses.NewBusLoggerFromConfig(buses.NewLoggingConfig()) |
There was a problem hiding this comment.
Same as above. Do we need the actual logs? Or just a logger (that writes to /dev/null)?
| } | ||
| } | ||
|
|
||
| func SetNewBusProvisioner(opts *buses.BusOpts) SetupNatssBus { |
There was a problem hiding this comment.
Thanks for investigating. I think this is fine as-is for this PR.
I think we need to create a fake Reconciler (not in this PR, just in general to make this stuff testable). Then we could set it in the options and not need this workaround.
opts.Reconciler = FakeReconciler(),As you call bus.Run(), just creating an empty Reconciler{} doesn't work either.
Random musing: Maybe we just need to make Reconciler an interface and swap out Run() to do nothing (as is done in the test file).
n3wscott
left a comment
There was a problem hiding this comment.
Apologies for the churn, we need to refactor this PR to use ClusterChannelProvisioners.
As it is, I don't think we want to commit any new changes to bus impls, we have changed directions a bit on what it means to back a channel.
| @@ -0,0 +1,101 @@ | |||
| apiVersion: v1 | |||
There was a problem hiding this comment.
It should actually be embedded into the controller for the provisioner for the natss channels. And then published into kubernetes when a channel is attempted to be provisioned with the natss cluster channel provisioner
| @@ -0,0 +1,9 @@ | |||
| # NATS Streaming - Knative Bus | |||
There was a problem hiding this comment.
The current model does not have busses and sorry to say that we will have to pivot this implementation to conform to the ideas of ClusterChannelProvisioners
| ReceiveMessageFunc: func(channel buses.ChannelReference, message *buses.Message) error { | ||
| bus.logger.Infof("Recieved message from %q channel", channel.String()) | ||
| if err := stanutil.Publish(natsConn, channel.Name, &message.Payload, bus.logger); err != nil { | ||
| bus.logger.Errorf("Error during publish: %+v", err) |
There was a problem hiding this comment.
Zap wants you to wrap err in zap.Error(err) for pretty print.
| bus.logger.Errorf("Error during publish: %+v", err) | ||
| return err | ||
| } | ||
| bus.logger.Infof("Published [%s] : '%s'", channel.String(), message) |
There was a problem hiding this comment.
We should not log the contents of the message. There is a huge risk to leak PII here.
| var err error | ||
| for i := 0; i < 60; i++ { | ||
| if natsConn, err = stanutil.Connect("knative-nats-streaming", clientId, b.natssUrl, b.logger); err != nil { | ||
| b.logger.Errorf(" Create new connection failed: %+v", err) |
There was a problem hiding this comment.
zap.Error(err) for pretty print.
| } | ||
|
|
||
| // Publish a message to a subject | ||
| func Publish(sc *stan.Conn, subj string, msg *[]byte, logger *zap.SugaredLogger) (err error) { |
There was a problem hiding this comment.
in general, loggers should not be passed around, only context and then pull the logger from the context. There are examples in the repo to do this with the knative/pkg/logging helpers.
| defer func() { | ||
| if r := recover(); r != nil { | ||
| err = fmt.Errorf("recovered from: %v", r) | ||
| logger.Errorf("Publish(): %v", err.Error()) |
There was a problem hiding this comment.
zap.Error(err) for pretty print.
| } | ||
| err = (*sc).Publish(subj, *msg) | ||
| if err != nil { | ||
| logger.Errorf("Error during publish: %v\n", err) |
There was a problem hiding this comment.
zap.Error(err) for pretty print.
| }() | ||
|
|
||
| if sc == nil { | ||
| err = errors.New("cant'publish on empty stan connection") |
There was a problem hiding this comment.
typo: -> "can't publish on empty stan connection"
There was a problem hiding this comment.
likely it should be "can not publish on empty stan connection"
| @@ -0,0 +1,73 @@ | |||
| package stanutil | |||
|
Thanks a lot for reviews! |
- move NATSS in knative-eventing namespace - use zap logger in instanutil - use manual Ack for NATSS messages
- remove "testify" from vendor - correct log messages - update Readme.md
|
The following is the coverage report on pkg/.
|
|
@radufa: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
|
@radufa I guess this can be closed - due to #624 ? |
|
Close this PR cause it is replaced by the new one: "Add NATS Streaming provisioner", see: #624 |
…knative#7250) (knative#491) * Fixed eventtype create-delete loop on built in sources * Fix unit tests * Fixed source finalizers * Fixed tests --------- Signed-off-by: Calum Murray <cmurray@redhat.com> Co-authored-by: Calum Murray <cmurray@redhat.com>
Fixes #
Proposed Changes