Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ import (
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.uber.org/zap"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
crlog "sigs.k8s.io/controller-runtime/pkg/runtime/log"
// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem right -- why not just remove it?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept it around as it helps with debugging. When I started working on controllers or anything that connects to GKE from my local machine I would get some config related error. After searching online I found threads that it is because of missing package. So I added it here and then commented it out as a nicety that could help others working on the code base.

)

var (
Expand Down
16 changes: 7 additions & 9 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,13 @@ import (
"os"
"time"

eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/logconfig"
"github.com/knative/eventing/pkg/reconciler/v1alpha1/broker"
"github.com/knative/eventing/pkg/reconciler/v1alpha1/channel"
"github.com/knative/eventing/pkg/reconciler/v1alpha1/namespace"
"github.com/knative/eventing/pkg/reconciler/v1alpha1/subscription"
"github.com/knative/eventing/pkg/reconciler/v1alpha1/trigger"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"

// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"

eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/logconfig"
istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
"github.com/knative/pkg/configmap"
"github.com/knative/pkg/logging"
Expand All @@ -46,9 +39,14 @@ import (
"github.com/knative/pkg/system"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
controllerruntime "sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, why not remove this here? It's clearly not needed, and seems like it would be simpler to just remove it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept it around as it helps with debugging. When I started working on controllers or anything that connects to GKE from my local machine I would get some config related error. After searching online I found threads that it is because of missing package. So I added it here and then commented it out as a nicety that could help others working on the code base.

)

const (
Expand Down
152 changes: 92 additions & 60 deletions cmd/fanoutsidecar/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,49 +25,47 @@ import (
"fmt"
"log"
"net/http"
"strings"
"time"

"github.com/knative/eventing/pkg/sidecar/configmap/filesystem"
"github.com/knative/eventing/pkg/sidecar/configmap/watcher"
"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/channelwatcher"
"github.com/knative/eventing/pkg/logging"
"github.com/knative/eventing/pkg/sidecar/fanout"
"github.com/knative/eventing/pkg/sidecar/multichannelfanout"
"github.com/knative/eventing/pkg/sidecar/swappable"
"github.com/knative/eventing/pkg/utils"
"github.com/knative/pkg/system"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"k8s.io/client-go/kubernetes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"
)

const (
defaultConfigMapName = "in-memory-channel-dispatcher-config-map"

// The following are the only valid values of the config_map_noticer flag.
cmnfVolume = "volume"
cmnfWatcher = "watcher"
// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)

var (
readTimeout = 1 * time.Minute
writeTimeout = 1 * time.Minute

port int
configMapNoticer string
configMapNamespace string
configMapName string
port int
channelProvisioners listFlags
)

func init() {
flag.IntVar(&port, "sidecar_port", -1, "The port to run the sidecar on.")
flag.StringVar(&configMapNoticer, "config_map_noticer", "", fmt.Sprintf("The system to notice changes to the ConfigMap. Valid values are: %s", configMapNoticerValues()))
flag.StringVar(&configMapNamespace, "config_map_namespace", system.Namespace(), "The namespace of the ConfigMap that is watched for configuration.")
flag.StringVar(&configMapName, "config_map_name", defaultConfigMapName, "The name of the ConfigMap that is watched for configuration.")
type listFlags []string

func (l *listFlags) String() string {
return ""
}
func (l *listFlags) Set(value string) error {
*l = append(*l, value)
return nil
}

func configMapNoticerValues() string {
return strings.Join([]string{cmnfVolume, cmnfWatcher}, ", ")
func init() {
flag.IntVar(&port, "sidecar_port", -1, "The port to run the sidecar on.")
flag.Var(&channelProvisioners, "channel_provisioner", "The provisioner of the channels that will be watched.")
}

func main() {
Expand All @@ -84,14 +82,18 @@ func main() {
logger.Fatal("--sidecar_port flag must be set")
}

if len(channelProvisioners) < 1 {
logger.Fatal("--channel_provisioner must be specified")
}

sh, err := swappable.NewEmptyHandler(logger)
if err != nil {
logger.Fatal("Unable to create swappable.Handler", zap.Error(err))
}

mgr, err := setupConfigMapNoticer(logger, sh.UpdateConfig)
mgr, err := setupChannelWatcher(logger, sh.UpdateConfig)
Comment thread
akashrv marked this conversation as resolved.
if err != nil {
logger.Fatal("Unable to create configMap noticer.", zap.Error(err))
logger.Fatal("Unable to create channel watcher.", zap.Error(err))
}

s := &http.Server{
Expand Down Expand Up @@ -125,57 +127,87 @@ func main() {
}
}

func setupConfigMapNoticer(logger *zap.Logger, configUpdated swappable.UpdateConfig) (manager.Manager, error) {
func setupChannelWatcher(logger *zap.Logger, configUpdated swappable.UpdateConfig) (manager.Manager, error) {
mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
if err != nil {
logger.Error("Error starting manager.", zap.Error(err))
logger.Error("Error creating new maanger.", zap.Error(err))
return nil, err
}

switch configMapNoticer {
case cmnfVolume:
err = setupConfigMapVolume(logger, mgr, configUpdated)
case cmnfWatcher:
err = setupConfigMapWatcher(logger, mgr, configUpdated)
default:
err = fmt.Errorf("need to provide the --config_map_noticer flag (valid values are %s)", configMapNoticerValues())
}
if err != nil {
if err = v1alpha1.AddToScheme(mgr.GetScheme()); err != nil {
logger.Error("Error while adding eventing scheme to manager.", zap.Error(err))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this include the scheme?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the question whether scheme is needed or should it be in this function?

  • Adding this scheme is necessary or else the watch through manager won't work

  • I considered whether I should add it here or in the channelwatcher.New function. Looking at rest of the code where controllers are created, I found that schemes are added in cmd/main rather than inside the controller. So I kept it this way to be consistent with rest of the code.

return nil, err
}
channelwatcher.New(mgr, logger, updateChannelConfig(configUpdated))

return mgr, nil
}

func setupConfigMapVolume(logger *zap.Logger, mgr manager.Manager, configUpdated swappable.UpdateConfig) error {
cmn, err := filesystem.NewConfigMapWatcher(logger, filesystem.ConfigDir, configUpdated)
if err != nil {
logger.Error("Unable to create filesystem.ConifgMapWatcher", zap.Error(err))
return err
}
if err = mgr.Add(cmn); err != nil {
logger.Error("Unable to add the config map watcher", zap.Error(err))
return err
func updateChannelConfig(updateConfig swappable.UpdateConfig) channelwatcher.WatchHandlerFunc {
Comment thread
akashrv marked this conversation as resolved.
return func(ctx context.Context, c client.Client, chanNamespacedName types.NamespacedName) error {
channels, err := listAllChannels(ctx, c)
if err != nil {
logging.FromContext(ctx).Info("Unable to list channels", zap.Error(err))
return err
}
config := multiChannelFanoutConfig(channels)
return updateConfig(config)
}
return nil
}

func setupConfigMapWatcher(logger *zap.Logger, mgr manager.Manager, configUpdated swappable.UpdateConfig) error {
kc, err := kubernetes.NewForConfig(mgr.GetConfig())
if err != nil {
return err
func listAllChannels(ctx context.Context, c client.Client) ([]v1alpha1.Channel, error) {
channels := make([]v1alpha1.Channel, 0)
for {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth a comment here indicating that this is a do... while loop on opts.Raw.Continue.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In particular, I'm surprised that the client.Client interface doesn't perform the full listing.

A reading of the ListOptions struct suggests that pagination may only be provided if limit is set (and I suspect that many clients would have failures if this wasn't the case). Do you have evidence that the more complicated code with continue tokens is needed?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably copied from other code that added pagination support defensively (@Harwayne may be able to provide context). Based on my reading of the original design for apiserver pagination, it appears @evankanderson is correct: pagination is opt-in by specifying the limit parameter.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am paranoid. I don't think it gets used today. I believe @akashrv looked into the Controller Runtime code in particular and confirmed that it isn't used. But the fact that it is in the interface makes me wary of leaving it out. I agree that ListOptions.Limit clearly talks about Continue with regards to having a limit set, but ListOptions.Continue doesn't specify that is the only time Continue will be used.

I agree with your reading of the origin design that this is purely an opt-in feature and that not providing a limit means get everything (even if that is 500 MB). Having been shown this, I am OK removing it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I'll simplify it and revert it to how it was in first iteration.

cl := &v1alpha1.ChannelList{}
opts := &client.ListOptions{
// Set Raw because if we need to get more than one page, then we will put the continue token
// into opts.Raw.Continue.
Raw: &metav1.ListOptions{},
}
if err := c.List(ctx, opts, cl); err != nil {
return nil, err
}
for _, c := range cl.Items {
if c.Status.IsReady() && shouldWatch(&c) {
channels = append(channels, c)
}
}
if cl.Continue != "" {
opts.Raw.Continue = cl.Continue
} else {
return channels, nil
}
Comment thread
akashrv marked this conversation as resolved.
}
}

cmw, err := watcher.NewWatcher(logger, kc, configMapNamespace, configMapName, configUpdated)
if err != nil {
return err
func shouldWatch(ch *v1alpha1.Channel) bool {
if ch.Spec.Provisioner != nil && ch.Spec.Provisioner.Namespace == "" {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe make namespace non-empty an early exit with a comment that we only support cluster-scoped provisioners (so it's easier to extend to namespace-scoped provisioners later)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't understand the comment. This function is used to filter out channels in the handler inside the watch. It is not used in the request path.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was suggesting writing this as:

if ch.Spec.Provisioner == nil || ch.Spec.Provisioner.Namespace != "" {
  // Only support cluster-level provisioners right now.
  return false
}
...

for _, v := range channelProvisioners {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make more sense to make channelProvisioners a map at init time, so you can use a map membership check?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't expect to have too many provisioners. Most cases this should be just one check without a for loop.
Moreover when we rename fanoutsidecar to be in-memory provisioner and deprecate the old in-memory-channel. This code will change to a single value check.
Hence left it this way.

if v == ch.Spec.Provisioner.Name {
return true
}
}
}
return false
}

if err = mgr.Add(utils.NewBlockingStart(logger, cmw)); err != nil {
logger.Error("Unable to add the config map watcher", zap.Error(err))
return err
func multiChannelFanoutConfig(channels []v1alpha1.Channel) *multichannelfanout.Config {
cc := make([]multichannelfanout.ChannelConfig, 0)
for _, c := range channels {
channelConfig := multichannelfanout.ChannelConfig{
Namespace: c.Namespace,
Name: c.Name,
HostName: c.Status.Address.Hostname,
}
if c.Spec.Subscribable != nil {
channelConfig.FanoutConfig = fanout.Config{
Subscriptions: c.Spec.Subscribable.Subscribers,
}
}
cc = append(cc, channelConfig)
}
return &multichannelfanout.Config{
ChannelConfigs: cc,
}
return nil
}

// runnableServer is a small wrapper around http.Server so that it matches the manager.Runnable
Expand Down
42 changes: 5 additions & 37 deletions config/provisioners/in-memory-channel/in-memory-channel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ rules:
- apiGroups:
- "" # Core API group.
resources:
- configmaps
- services
verbs:
- get
Expand All @@ -83,24 +82,6 @@ rules:
- services
verbs:
- update
- apiGroups:
- "" # Core API Group.
resources:
- configmaps
resourceNames:
- in-memory-channel-dispatcher-config-map
verbs:
- update
- apiGroups:
- networking.istio.io
resources:
- virtualservices
verbs:
- get
- list
- watch
- create
- update
- apiGroups:
- "" # Core API Group.
resources:
Expand Down Expand Up @@ -168,9 +149,10 @@ metadata:
name: in-memory-channel-dispatcher
rules:
- apiGroups:
- "" # Core API group.
- "eventing.knative.dev"
resources:
- configmaps
- "channels"
- "channels/status"
verbs:
- get
- list
Expand Down Expand Up @@ -216,24 +198,10 @@ spec:
image: github.com/knative/eventing/cmd/fanoutsidecar
args:
- --sidecar_port=8080
- --config_map_noticer=watcher
- --config_map_namespace=knative-eventing
- --config_map_name=in-memory-channel-dispatcher-config-map
- --channel_provisioner=in-memory
- --channel_provisioner=in-memory-channel
env:
- name: SYSTEM_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace

---

# Create the ConfigMap, because if we don't the dispatcher will flap when it first comes online and
# this can cause the integration tests to fail.

apiVersion: v1
kind: ConfigMap
metadata:
name: in-memory-channel-dispatcher-config-map
namespace: knative-eventing
data:
multiChannelFanoutConfig: '{}'
12 changes: 6 additions & 6 deletions contrib/kafka/cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ import (
"flag"
"os"

provisionerController "github.com/knative/eventing/contrib/kafka/pkg/controller"
"github.com/knative/eventing/contrib/kafka/pkg/controller/channel"
eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/runtime"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"

provisionerController "github.com/knative/eventing/contrib/kafka/pkg/controller"
"github.com/knative/eventing/contrib/kafka/pkg/controller/channel"
eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)

// SchemeFunc adds types to a Scheme.
Expand Down
12 changes: 6 additions & 6 deletions contrib/kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@ import (
"flag"
"os"

provisionerController "github.com/knative/eventing/contrib/kafka/pkg/controller"
"github.com/knative/eventing/contrib/kafka/pkg/controller/channel"
eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/runtime"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"

provisionerController "github.com/knative/eventing/contrib/kafka/pkg/controller"
"github.com/knative/eventing/contrib/kafka/pkg/controller/channel"
eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
// Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters).
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
)

const (
Expand Down
Loading