Add NATS Streaming provisioner#624
Conversation
| @@ -0,0 +1,101 @@ | |||
| apiVersion: v1 | |||
There was a problem hiding this comment.
Can you add a README at the root of the natss folder, similar like the in-memory and kafka channel provisioners have it, e.g.:
https://github.com/knative/eventing/tree/master/config/provisioners/kafka
There was a problem hiding this comment.
feel free to include links to setup for nats(s) as well
|
|
||
| NATS Streaming is deployed as a StatefulSet. | ||
| For tuning NATS Streaming, see: | ||
| https://github.com/nats-io/nats-streaming-server#configuring |
There was a problem hiding this comment.
Also state the name of the ConfigMap that contains the configuration.
| r.logger.Info("Not reconciling Channel, it is not controlled by this Controller", zap.Any("ref", c.Spec)) | ||
| return reconcile.Result{}, nil | ||
| } | ||
| //r.logger.Info("Reconciling Channel:", zap.Any("channel", c)) |
There was a problem hiding this comment.
Either uncomment or remove.
|
|
||
| // Modify a copy, not the original. | ||
| c = c.DeepCopy() | ||
| err = r.reconcile(ctx, c) |
There was a problem hiding this comment.
I think this follows a pattern in other reoncilers, but I changed my preference recently and think that this should be in a new variable reconcileErr.
My reasoning is that err tends to be checked and handled immediately, without the intention of using it again in the future. As this piece of code wants to use err again later, I think it is useful to give it a different name.
|
|
||
| if c.DeletionTimestamp != nil { | ||
| // K8s garbage collection will delete the K8s service and VirtualService for this channel. | ||
| // We use a finalizer to ensure the channel config has been synced. |
There was a problem hiding this comment.
It doesn't look like there is any config that is synced. Do we need a finalizer at all?
| // Modify a copy of this object, rather than the original. | ||
| ccp = ccp.DeepCopy() | ||
|
|
||
| err = r.reconcile(ctx, ccp) |
There was a problem hiding this comment.
reconcileErr, using the same reasoning as above.
| // Modify a copy, not the original. | ||
| c = c.DeepCopy() | ||
|
|
||
| err = r.reconcile(ctx, c) |
There was a problem hiding this comment.
reconcileErr, same reasoning as above.
| err = errors.New("can't close empty stan connection") | ||
| return | ||
| } | ||
| err = (*sc).Close() |
There was a problem hiding this comment.
Is the dereference needed? Does sc.Close() work?
There was a problem hiding this comment.
Here "sc" is a pointer to an interface, see it in the function signature: "sc *stan.Conn"
|
|
||
| // Close must be the last call to close the connection | ||
| func Close(sc *stan.Conn, logger *zap.SugaredLogger) (err error) { | ||
| defer func() { |
There was a problem hiding this comment.
Add a comment about why this might panic, so we defer a recover().
|
|
||
| // Publish a message to a subject | ||
| func Publish(sc *stan.Conn, subj string, msg *[]byte, logger *zap.SugaredLogger) (err error) { | ||
| defer func() { |
There was a problem hiding this comment.
Add a comment about why this might panic, so we defer a recover().
| err = errors.New("cant'publish on empty stan connection") | ||
| return | ||
| } | ||
| err = (*sc).Publish(subj, *msg) |
There was a problem hiding this comment.
Is the dereference needed? Does sc.Close() work?
There was a problem hiding this comment.
"sc" is a pointer to an interface
| ) | ||
|
|
||
| var ( | ||
| natsConnMux sync.Mutex |
There was a problem hiding this comment.
Add a comment about when this lock needs to be held.
| kind: Service | ||
| metadata: | ||
| name: nats-streaming | ||
| namespace: knative-eventing |
There was a problem hiding this comment.
not sure if the nats-streaming bit should be in the knative-eventing ns
There was a problem hiding this comment.
NATSS is running now in its own namespace: natss
Done
| 1. Setup [Knative Eventing](../../../DEVELOPMENT.md). | ||
| 1. Apply the 'natss' ClusterChannelProvisioner, Controller, and Dispatcher. | ||
| ```shell | ||
| ko apply -f config/provisioners/natss/ |
There was a problem hiding this comment.
The above would provision natss, and the cpp - right ?
I think I'd prefer some separation here (e.g see Kafka). and that the actual provisioning of the natss bits would be more a sample - as well as showing how to use existing/external natss.
and than for the ccp, have just something like:
ko apply -f config/provisioners/natss/natss.yaml
| - list | ||
| - watch | ||
| - create | ||
|
|
There was a problem hiding this comment.
- update
The common lib was changed to update the VirtualService if it has an unexpected spec.
| sidecar.istio.io/inject: "true" | ||
| labels: *labels | ||
| spec: | ||
| serviceAccountName: natss-controller |
There was a problem hiding this comment.
Ideally this would use a different service account (as it needs fewer permissions than the controller). Doesn't need to be this PR, but please add a TODO to use a different service account.
There was a problem hiding this comment.
The dispatcher uses now "natss-dispatcher" as ServiceAccount
Done
| "sigs.k8s.io/controller-runtime/pkg/client/fake" | ||
| ) | ||
|
|
||
| func TestInjectClient(t *testing.T) { |
There was a problem hiding this comment.
Should this be in reconcile_test.go?
| // The Channel may have been deleted since it was added to the workqueue. | ||
| if errors.IsNotFound(err) { | ||
| r.logger.Info("Could not find Channel", zap.Error(err)) | ||
| // unsubscribe all active subscriptions for this channel |
There was a problem hiding this comment.
I'm not sure we can rely on this to notice Channel deletion. I would expect to use a finalizer if you want to notice the channel is being deleted.
There was a problem hiding this comment.
Channel finalizer is used now to cleanup the NATSS active subscriptions.
Done
- add natss-dispatcher ServiceAccount
|
The following is the coverage report on pkg/.
|
| s.subscriptions[cRef] = chMap | ||
| } | ||
| for _, sub := range subscriptions { | ||
| // check if the subscribtion already exist and do nothing in this case |
There was a problem hiding this comment.
subscribtion -> subscription
| } | ||
| // Unsubscribe for deleted subscriptions | ||
| for sub := range chMap { | ||
| if ok := activeSubs[sub]; !ok { |
There was a problem hiding this comment.
Because this is a set, we probably shouldn't rely on the value being true
if _, ok := activeSubs[sub]; !ok {
evankanderson
left a comment
There was a problem hiding this comment.
/approve
/hold for comments
| @@ -0,0 +1,44 @@ | |||
| # NATS Streaming Channels | |||
|
|
|||
There was a problem hiding this comment.
It might be worth providing a quick overview here of the benefits of NATS Streaming:
Approx throughput?
Durability / chance of data loss?
Resource requirements?
Ordering guarantees?
| app: nats-streaming | ||
| spec: | ||
| serviceName: nats-streaming | ||
| replicas: 1 |
There was a problem hiding this comment.
Does 1 replica have durability implications? Can this number be changed on the fly?
| return reconcile.Result{}, reconcileErr | ||
| } | ||
|
|
||
| // IsControlled determines if the in-memory Channel Controller should control (and therefore |
There was a problem hiding this comment.
s/in-memory/nats/, correct?
There was a problem hiding this comment.
or s/in-memory Channel/this/
| for i := 0; i < 60; i++ { | ||
| if sc, err = stan.Connect(clusterId, clientId, stan.NatsURL(natsUrl)); err != nil { | ||
| logger.Warnf("Connect(): create new connection failed: %v", err) | ||
| time.Sleep(1 * time.Second) |
There was a problem hiding this comment.
Given the 1-60 loop above, I don't know that changing this sleep alone will help much.
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: evankanderson, radufa The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
Feel free to do a followup PR for the comments; I wanted to strike while Adam's review was hot. |
Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
Proposed Changes