diff --git a/Gopkg.lock b/Gopkg.lock index f69177a62d6..209221172b3 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -24,6 +24,14 @@ pruneopts = "NUT" revision = "3a771d992973f24aa725d07868b467d1ddfceafb" +[[projects]] + digest = "1:be1e32779b5150a3e4341f584f14282db95feff53f60d49b17134a5710ac8ff9" + name = "github.com/bsm/sarama-cluster" + packages = ["."] + pruneopts = "NUT" + revision = "c618e605e15c0d7535f6c96ff8efbb0dba4fd66c" + version = "v2.1.15" + [[projects]] digest = "1:a2c1d0e43bd3baaa071d1b9ed72c27d78169b2b269f71c105ac4ba34b1be4a39" name = "github.com/davecgh/go-spew" @@ -1011,6 +1019,7 @@ analyzer-version = 1 input-imports = [ "github.com/Shopify/sarama", + "github.com/bsm/sarama-cluster", "github.com/fsnotify/fsnotify", "github.com/golang/glog", "github.com/google/go-cmp/cmp", diff --git a/config/provisioners/kafka/kafka-provisioner.yaml b/config/provisioners/kafka/kafka-provisioner.yaml index 78039fc55fb..94eb1f53f9d 100644 --- a/config/provisioners/kafka/kafka-provisioner.yaml +++ b/config/provisioners/kafka/kafka-provisioner.yaml @@ -45,11 +45,20 @@ rules: - "" # Core API group. resources: - services + - configmaps verbs: - get - list - watch - create + - apiGroups: + - "" # Core API Group. + resources: + - configmaps + resourceNames: + - kafka-channel-dispatcher + verbs: + - update - apiGroups: - networking.istio.io resources: @@ -81,7 +90,7 @@ metadata: name: kafka-channel-controller-config namespace: knative-eventing data: - # Broker URL's for the provisioner + # Broker URL's for the provisioner. Replace this with the URL's for your kafka cluster. bootstrap_servers: kafkabroker.kafka:9092 --- @@ -100,7 +109,7 @@ spec: serviceAccountName: kafka-channel-controller containers: - name: kafka-channel-controller-controller - image: github.com/knative/eventing/pkg/provisioners/kafka + image: github.com/knative/eventing/pkg/provisioners/kafka/cmd/controller volumeMounts: - name: kafka-channel-controller-config mountPath: /etc/config-provisioner @@ -108,3 +117,82 @@ spec: - name: kafka-channel-controller-config configMap: name: kafka-channel-controller-config +--- + +apiVersion: v1 +kind: ServiceAccount +metadata: + name: kafka-channel-dispatcher + namespace: knative-eventing + +--- + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: kafka-channel-dispatcher + namespace: knative-eventing +rules: + - apiGroups: + - "" # Core API group. + resources: + - configmaps + verbs: + - get + - list + - watch + +--- + +apiVersion: rbac.authorization.k8s.io/v1beta1 +kind: ClusterRoleBinding +metadata: + name: kafka-channel-dispatcher + namespace: knative-eventing +subjects: + - kind: ServiceAccount + name: kafka-channel-dispatcher + namespace: knative-eventing +roleRef: + kind: ClusterRole + name: kafka-channel-dispatcher + apiGroup: rbac.authorization.k8s.io + +--- + +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: kafka-channel-dispatcher + namespace: knative-eventing +spec: + replicas: 1 + selector: + matchLabels: &labels + clusterChannelProvisioner: kafka-channel + role: dispatcher + serviceName: kafka-channel-dispatcher-service + template: + metadata: + annotations: + sidecar.istio.io/inject: "true" + labels: *labels + spec: + serviceAccountName: kafka-channel-dispatcher + containers: + - name: dispatcher + image: github.com/knative/eventing/pkg/provisioners/kafka/cmd/dispatcher + env: + - name: DISPATCHER_CONFIGMAP_NAME + value: kafka-channel-dispatcher + - name: DISPATCHER_CONFIGMAP_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + volumeMounts: + - name: kafka-channel-controller-config + mountPath: /etc/config-provisioner + volumes: + - name: kafka-channel-controller-config + configMap: + name: kafka-channel-controller-config diff --git a/pkg/provisioners/kafka/cmd/controller/main.go b/pkg/provisioners/kafka/cmd/controller/main.go new file mode 100644 index 00000000000..9fceebdefb0 --- /dev/null +++ b/pkg/provisioners/kafka/cmd/controller/main.go @@ -0,0 +1,79 @@ +package main + +import ( + "flag" + "os" + + istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/runtime" + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/manager" + logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" + "sigs.k8s.io/controller-runtime/pkg/runtime/signals" + + eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/provisioners" + provisionerController "github.com/knative/eventing/pkg/provisioners/kafka/controller" + "github.com/knative/eventing/pkg/provisioners/kafka/controller/channel" +) + +// SchemeFunc adds types to a Scheme. +type SchemeFunc func(*runtime.Scheme) error + +// ProvideFunc adds a controller to a Manager. +type ProvideFunc func(mgr manager.Manager, config *provisionerController.KafkaProvisionerConfig, logger *zap.Logger) (controller.Controller, error) + +func main() { + flag.Parse() + logf.SetLogger(logf.ZapLogger(false)) + + logger := provisioners.NewProvisionerLoggerFromConfig(provisioners.NewLoggingConfig()) + defer logger.Sync() + + // Setup a Manager + mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{}) + if err != nil { + logger.Error(err, "unable to run controller manager") + os.Exit(1) + } + + // Add custom types to this array to get them into the manager's scheme. + schemeFuncs := []SchemeFunc{ + eventingv1alpha.AddToScheme, + istiov1alpha3.AddToScheme, + } + for _, schemeFunc := range schemeFuncs { + schemeFunc(mgr.GetScheme()) + } + + // Add each controller's ProvideController func to this list to have the + // manager run it. + providers := []ProvideFunc{ + provisionerController.ProvideController, + channel.ProvideController, + } + + // TODO the underlying config map needs to be watched and the config should be reloaded if there is a change. + provisionerConfig, err := provisionerController.GetProvisionerConfig("/etc/config-provisioner") + + if err != nil { + logger.Error(err, "unable to run controller manager") + os.Exit(1) + } + + for _, provider := range providers { + if _, err := provider(mgr, provisionerConfig, logger.Desugar()); err != nil { + logger.Error(err, "unable to run controller manager") + os.Exit(1) + } + } + + // Start blocks forever. + err = mgr.Start(signals.SetupSignalHandler()) + if err != nil { + logger.Fatal("Manager.Start() returned an error", zap.Error(err)) + } +} diff --git a/pkg/provisioners/kafka/cmd/dispatcher/main.go b/pkg/provisioners/kafka/cmd/dispatcher/main.go new file mode 100644 index 00000000000..b9a2ea866ef --- /dev/null +++ b/pkg/provisioners/kafka/cmd/dispatcher/main.go @@ -0,0 +1,99 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "fmt" + "log" + "os" + + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/client/config" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/runtime/signals" + + provisionerController "github.com/knative/eventing/pkg/provisioners/kafka/controller" + "github.com/knative/eventing/pkg/provisioners/kafka/dispatcher" + "github.com/knative/eventing/pkg/sidecar/configmap/watcher" + "github.com/knative/eventing/pkg/system" +) + +func main() { + + configMapName := os.Getenv("DISPATCHER_CONFIGMAP_NAME") + if configMapName == "" { + configMapName = provisionerController.DispatcherConfigMapName + } + configMapNamespace := os.Getenv("DISPATCHER_CONFIGMAP_NAMESPACE") + if configMapNamespace == "" { + configMapNamespace = system.Namespace + } + + logger, err := zap.NewProduction() + if err != nil { + log.Fatalf("unable to create logger: %v", err) + } + + provisionerConfig, err := provisionerController.GetProvisionerConfig("/etc/config-provisioner") + if err != nil { + logger.Fatal("unable to load provisioner config", zap.Error(err)) + } + + mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{}) + if err != nil { + logger.Fatal("unable to create manager.", zap.Error(err)) + } + + kafkaDispatcher, err := dispatcher.NewDispatcher(provisionerConfig.Brokers, logger) + if err != nil { + logger.Fatal("unable to create kafka dispatcher.", zap.Error(err)) + } + + kc, err := kubernetes.NewForConfig(mgr.GetConfig()) + if err != nil { + logger.Fatal("unable to create kubernetes client.", zap.Error(err)) + } + + cmw, err := watcher.NewWatcher(logger, kc, configMapNamespace, configMapName, kafkaDispatcher.UpdateConfig) + if err != nil { + logger.Fatal("unable to create configmap watcher", zap.String("configmap", fmt.Sprintf("%s/%s", configMapNamespace, configMapName))) + } + mgr.Add(cmw) + + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + + // Start both the manager (which notices ConfigMap changes) and the HTTP server. + var g errgroup.Group + g.Go(func() error { + // Start blocks forever, so run it in a goroutine. + return mgr.Start(stopCh) + }) + + g.Go(func() error { + // Setups message receiver and blocks + return kafkaDispatcher.Start(stopCh) + }) + + err = g.Wait() + if err != nil { + logger.Error("Either the kafka message receiver or the ConfigMap noticer failed.", zap.Error(err)) + } + +} diff --git a/pkg/provisioners/kafka/controller/channel/provider.go b/pkg/provisioners/kafka/controller/channel/provider.go index be24497f1cb..a5a220c1ad4 100644 --- a/pkg/provisioners/kafka/controller/channel/provider.go +++ b/pkg/provisioners/kafka/controller/channel/provider.go @@ -21,6 +21,7 @@ import ( istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -31,6 +32,7 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" common "github.com/knative/eventing/pkg/provisioners/kafka/controller" + "github.com/knative/eventing/pkg/system" ) const ( @@ -39,11 +41,19 @@ const ( controllerAgentName = "kafka-provisioner-channel-controller" ) +var ( + defaultConfigMapKey = types.NamespacedName{ + Namespace: system.Namespace, + Name: common.DispatcherConfigMapName, + } +) + type reconciler struct { - client client.Client - recorder record.EventRecorder - logger *zap.Logger - config *common.KafkaProvisionerConfig + client client.Client + recorder record.EventRecorder + logger *zap.Logger + config *common.KafkaProvisionerConfig + configMapKey client.ObjectKey // Using a shared kafkaClusterAdmin does not work currently because of an issue with // Shopify/sarama, see https://github.com/Shopify/sarama/issues/1162. kafkaClusterAdmin sarama.ClusterAdmin @@ -57,9 +67,10 @@ func ProvideController(mgr manager.Manager, config *common.KafkaProvisionerConfi // Setup a new controller to Reconcile Channel. c, err := controller.New(controllerAgentName, mgr, controller.Options{ Reconciler: &reconciler{ - recorder: mgr.GetRecorder(controllerAgentName), - logger: logger, - config: config, + recorder: mgr.GetRecorder(controllerAgentName), + logger: logger, + config: config, + configMapKey: defaultConfigMapKey, }, }) if err != nil { diff --git a/pkg/provisioners/kafka/controller/channel/reconcile.go b/pkg/provisioners/kafka/controller/channel/reconcile.go index 2dd03cf53c5..c6065f13772 100644 --- a/pkg/provisioners/kafka/controller/channel/reconcile.go +++ b/pkg/provisioners/kafka/controller/channel/reconcile.go @@ -23,16 +23,21 @@ import ( "github.com/Shopify/sarama" "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" eventingController "github.com/knative/eventing/pkg/controller" util "github.com/knative/eventing/pkg/provisioners" "github.com/knative/eventing/pkg/provisioners/kafka/controller" + "github.com/knative/eventing/pkg/sidecar/configmap" + "github.com/knative/eventing/pkg/sidecar/fanout" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" + "k8s.io/apimachinery/pkg/api/equality" ) const ( @@ -51,7 +56,7 @@ type channelArgs struct { func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { ctx := context.TODO() r.logger.Info("Reconciling channel", zap.Any("request", request)) - channel := &v1alpha1.Channel{} + channel := &eventingv1alpha1.Channel{} err := r.client.Get(context.TODO(), request.NamespacedName, channel) // The Channel may have been deleted since it was added to the workqueue. If so, there is @@ -77,8 +82,8 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err if err != nil { return reconcile.Result{}, err } - provisionerRef := channel.Spec.Provisioner - if provisionerRef.Name != clusterChannelProvisioner.Name { + + if !r.shouldReconcile(channel, clusterChannelProvisioner) { return reconcile.Result{}, nil } @@ -104,7 +109,13 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err return reconcile.Result{}, err } -func (r *reconciler) reconcile(ctx context.Context, channel *v1alpha1.Channel) error { +func (r *reconciler) reconcile(ctx context.Context, channel *eventingv1alpha1.Channel) error { + + // We always need to sync the Channel config, so do it first. + if err := r.syncChannelConfig(ctx); err != nil { + r.logger.Info("error updating syncing the Channel config", zap.Error(err)) + return err + } // We don't currently initialize r.kafkaClusterAdmin, hence we end up creating the cluster admin client every time. // This is because of an issue with Shopify/sarama. See https://github.com/Shopify/sarama/issues/1162. @@ -178,7 +189,11 @@ func (r *reconciler) reconcile(ctx context.Context, channel *v1alpha1.Channel) e return nil } -func (r *reconciler) provisionChannel(channel *v1alpha1.Channel, kafkaClusterAdmin sarama.ClusterAdmin) error { +func (r *reconciler) shouldReconcile(channel *eventingv1alpha1.Channel, clusterChannelProvisioner *eventingv1alpha1.ClusterChannelProvisioner) bool { + return channel.Spec.Provisioner.Name == clusterChannelProvisioner.Name +} + +func (r *reconciler) provisionChannel(channel *eventingv1alpha1.Channel, kafkaClusterAdmin sarama.ClusterAdmin) error { topicName := topicName(channel) r.logger.Info("creating topic on kafka cluster", zap.String("topic", topicName)) @@ -210,7 +225,7 @@ func (r *reconciler) provisionChannel(channel *v1alpha1.Channel, kafkaClusterAdm return err } -func (r *reconciler) deprovisionChannel(channel *v1alpha1.Channel, kafkaClusterAdmin sarama.ClusterAdmin) error { +func (r *reconciler) deprovisionChannel(channel *eventingv1alpha1.Channel, kafkaClusterAdmin sarama.ClusterAdmin) error { topicName := topicName(channel) r.logger.Info("deleting topic on kafka cluster", zap.String("topic", topicName)) @@ -225,8 +240,8 @@ func (r *reconciler) deprovisionChannel(channel *v1alpha1.Channel, kafkaClusterA return err } -func (r *reconciler) getClusterChannelProvisioner() (*v1alpha1.ClusterChannelProvisioner, error) { - clusterChannelProvisioner := &v1alpha1.ClusterChannelProvisioner{} +func (r *reconciler) getClusterChannelProvisioner() (*eventingv1alpha1.ClusterChannelProvisioner, error) { + clusterChannelProvisioner := &eventingv1alpha1.ClusterChannelProvisioner{} objKey := client.ObjectKey{ Name: controller.Name, } @@ -236,6 +251,115 @@ func (r *reconciler) getClusterChannelProvisioner() (*v1alpha1.ClusterChannelPro return clusterChannelProvisioner, nil } +func (r *reconciler) syncChannelConfig(ctx context.Context) error { + channels, err := r.listAllChannels(ctx) + if err != nil { + r.logger.Info("Unable to list channels", zap.Error(err)) + return err + } + config := multiChannelFanoutConfig(channels) + return r.writeConfigMap(ctx, config) +} + +func (r *reconciler) writeConfigMap(ctx context.Context, config *multichannelfanout.Config) error { + logger := r.logger.With(zap.Any("configMap", r.configMapKey)) + + updated, err := configmap.SerializeConfig(*config) + if err != nil { + r.logger.Error("Unable to serialize config", zap.Error(err), zap.Any("config", config)) + return err + } + + cm := &corev1.ConfigMap{} + err = r.client.Get(ctx, r.configMapKey, cm) + if errors.IsNotFound(err) { + cm = r.createNewConfigMap(updated) + err = r.client.Create(ctx, cm) + if err != nil { + logger.Info("Unable to create ConfigMap", zap.Error(err)) + return err + } + } + if err != nil { + logger.Info("Unable to get ConfigMap", zap.Error(err)) + return err + } + + if equality.Semantic.DeepEqual(cm.Data, updated) { + // Nothing to update. + return nil + } + + cm.Data = updated + return r.client.Update(ctx, cm) +} + +func (r *reconciler) createNewConfigMap(data map[string]string) *corev1.ConfigMap { + return &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: r.configMapKey.Namespace, + Name: r.configMapKey.Name, + }, + Data: data, + } +} + +func multiChannelFanoutConfig(channels []eventingv1alpha1.Channel) *multichannelfanout.Config { + cc := make([]multichannelfanout.ChannelConfig, 0) + for _, c := range channels { + channelConfig := multichannelfanout.ChannelConfig{ + Namespace: c.Namespace, + Name: c.Name, + } + if c.Spec.Subscribable != nil { + channelConfig.FanoutConfig = fanout.Config{ + Subscriptions: c.Spec.Subscribable.Subscribers, + } + } + cc = append(cc, channelConfig) + } + return &multichannelfanout.Config{ + ChannelConfigs: cc, + } +} + +func (r *reconciler) listAllChannels(ctx context.Context) ([]eventingv1alpha1.Channel, error) { + clusterChannelProvisioner, err := r.getClusterChannelProvisioner() + if err != nil { + return nil, err + } + + channels := make([]eventingv1alpha1.Channel, 0) + + opts := &client.ListOptions{ + // TODO this is here because the fake client needs it. Remove this when it's no longer + // needed. + Raw: &metav1.ListOptions{ + TypeMeta: metav1.TypeMeta{ + APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), + Kind: "Channel", + }, + }, + } + for { + cl := &eventingv1alpha1.ChannelList{} + if err := r.client.List(ctx, opts, cl); err != nil { + return nil, err + } + + for _, c := range cl.Items { + if r.shouldReconcile(&c, clusterChannelProvisioner) { + channels = append(channels, c) + } + } + if cl.Continue != "" { + opts.Raw.Continue = cl.Continue + } else { + return channels, nil + } + } +} + func createKafkaAdminClient(config *controller.KafkaProvisionerConfig) (sarama.ClusterAdmin, error) { saramaConf := sarama.NewConfig() saramaConf.Version = sarama.V1_1_0_0 @@ -243,7 +367,7 @@ func createKafkaAdminClient(config *controller.KafkaProvisionerConfig) (sarama.C return sarama.NewClusterAdmin(config.Brokers, saramaConf) } -func topicName(channel *v1alpha1.Channel) string { +func topicName(channel *eventingv1alpha1.Channel) string { return fmt.Sprintf("%s.%s", channel.Namespace, channel.Name) } diff --git a/pkg/provisioners/kafka/controller/provider.go b/pkg/provisioners/kafka/controller/provider.go index 24885ee585e..0f6ca5631f2 100644 --- a/pkg/provisioners/kafka/controller/provider.go +++ b/pkg/provisioners/kafka/controller/provider.go @@ -35,6 +35,10 @@ const ( // controllerAgentName is the string used by this controller to identify // itself when creating events. controllerAgentName = "kafka-provisioner-controller" + // ConfigMapName is the name of the ConfigMap in the knative-eventing namespace that contains + // the subscription information for all kafka Channels. The Provisioner writes to it and the + // Dispatcher reads from it. + DispatcherConfigMapName = "kafka-channel-dispatcher" ) type reconciler struct { diff --git a/pkg/provisioners/kafka/controller/util.go b/pkg/provisioners/kafka/controller/util.go new file mode 100644 index 00000000000..7184e655b3f --- /dev/null +++ b/pkg/provisioners/kafka/controller/util.go @@ -0,0 +1,39 @@ +package controller + +import ( + "fmt" + "strings" + + "github.com/knative/pkg/configmap" +) + +const ( + BrokerConfigMapKey = "bootstrap_servers" +) + +// GetProvisionerConfig returns the details of the associated ClusterChannelProvisioner object +func GetProvisionerConfig(path string) (*KafkaProvisionerConfig, error) { + configMap, err := configmap.Load(path) + if err != nil { + return nil, fmt.Errorf("error loading provisioner configuration: %s", err) + } + + if len(configMap) == 0 { + return nil, fmt.Errorf("missing provisioner configuration") + } + + config := &KafkaProvisionerConfig{} + + if brokers, ok := configMap[BrokerConfigMapKey]; ok { + bootstrapServers := strings.Split(brokers, ",") + for _, s := range bootstrapServers { + if len(s) == 0 { + return nil, fmt.Errorf("empty %s value in provisioner configuration", BrokerConfigMapKey) + } + } + config.Brokers = bootstrapServers + return config, nil + } + + return nil, fmt.Errorf("missing key %s in provisioner configuration", BrokerConfigMapKey) +} diff --git a/pkg/provisioners/kafka/controller/util_test.go b/pkg/provisioners/kafka/controller/util_test.go new file mode 100644 index 00000000000..dfbd8d48f1f --- /dev/null +++ b/pkg/provisioners/kafka/controller/util_test.go @@ -0,0 +1,99 @@ +package controller + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestGetProvisionerConfigBrokers(t *testing.T) { + + testCases := []struct { + name string + data map[string]string + path string + getError string + expected *KafkaProvisionerConfig + }{ + { + name: "invalid config path", + path: "/tmp/does_not_exist", + getError: "error loading provisioner configuration: lstat /tmp/does_not_exist: no such file or directory", + }, + { + name: "configmap with no data", + data: map[string]string{}, + getError: "missing provisioner configuration", + }, + { + name: "configmap with no bootstrap_servers key", + data: map[string]string{"key": "value"}, + getError: "missing key bootstrap_servers in provisioner configuration", + }, + { + name: "configmap with empty bootstrap_servers value", + data: map[string]string{"bootstrap_servers": ""}, + getError: "empty bootstrap_servers value in provisioner configuration", + }, + { + name: "single bootstrap_servers", + data: map[string]string{"bootstrap_servers": "kafkabroker.kafka:9092"}, + expected: &KafkaProvisionerConfig{ + Brokers: []string{"kafkabroker.kafka:9092"}, + }, + }, + { + name: "multiple bootstrap_servers", + data: map[string]string{"bootstrap_servers": "kafkabroker1.kafka:9092,kafkabroker2.kafka:9092"}, + expected: &KafkaProvisionerConfig{ + Brokers: []string{"kafkabroker1.kafka:9092", "kafkabroker2.kafka:9092"}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Logf("Running %s", t.Name()) + + if tc.path == "" { + dir, err := ioutil.TempDir("", "util_test") + if err != nil { + t.Errorf("error creating tmp directory") + } + defer os.RemoveAll(dir) + + for k, v := range tc.data { + if k != "" { + path := filepath.Join(dir, k) + if err := ioutil.WriteFile(path, []byte(v), 0600); err != nil { + t.Errorf("error writing file %s: %s", path, err) + } + } + } + + tc.path = dir + } + got, err := GetProvisionerConfig(tc.path) + + if tc.getError != "" { + if err == nil { + t.Errorf("Expected Config error: '%v'. Actual nil", tc.getError) + } else if err.Error() != tc.getError { + t.Errorf("Unexpected Config error. Expected '%v'. Actual '%v'", tc.getError, err) + } + return + } else if err != nil { + t.Errorf("Unexpected Config error. Expected nil. Actual '%v'", err) + } + + if diff := cmp.Diff(tc.expected, got); diff != "" { + t.Errorf("unexpected Config (-want, +got) = %v", diff) + } + + }) + } + +} diff --git a/pkg/provisioners/kafka/dispatcher/dispatcher.go b/pkg/provisioners/kafka/dispatcher/dispatcher.go new file mode 100644 index 00000000000..b323425adaf --- /dev/null +++ b/pkg/provisioners/kafka/dispatcher/dispatcher.go @@ -0,0 +1,298 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package dispatcher + +import ( + "errors" + "fmt" + "sync" + "sync/atomic" + + "github.com/Shopify/sarama" + "github.com/bsm/sarama-cluster" + "github.com/google/go-cmp/cmp" + "go.uber.org/zap" + + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + "github.com/knative/eventing/pkg/buses" + "github.com/knative/eventing/pkg/provisioners/kafka/controller" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" +) + +type KafkaDispatcher struct { + config atomic.Value + updateLock sync.Mutex + + receiver *buses.MessageReceiver + dispatcher *buses.MessageDispatcher + + kafkaAsyncProducer sarama.AsyncProducer + kafkaConsumers map[buses.ChannelReference]map[subscription]KafkaConsumer + kafkaCluster KafkaCluster + + logger *zap.Logger +} + +type KafkaConsumer interface { + Messages() <-chan *sarama.ConsumerMessage + MarkOffset(msg *sarama.ConsumerMessage, metadata string) + Close() (err error) +} + +type KafkaCluster interface { + NewConsumer(groupID string, topics []string) (KafkaConsumer, error) +} + +type saramaCluster struct { + kafkaBrokers []string +} + +func (c *saramaCluster) NewConsumer(groupID string, topics []string) (KafkaConsumer, error) { + consumerConfig := cluster.NewConfig() + consumerConfig.Version = sarama.V1_1_0_0 + return cluster.NewConsumer(c.kafkaBrokers, groupID, topics, consumerConfig) +} + +type subscription struct { + Namespace string + Name string + SubscriberURI string + ReplyURI string +} + +// ConfigDiffs diffs the new config with the existing config. If there are no differences, then the +// empty string is returned. If there are differences, then a non-empty string is returned +// describing the differences. +func (d *KafkaDispatcher) ConfigDiff(updated *multichannelfanout.Config) string { + return cmp.Diff(d.getConfig(), updated) +} + +func (d *KafkaDispatcher) UpdateConfig(config *multichannelfanout.Config) error { + if config == nil { + return errors.New("nil config") + } + + d.updateLock.Lock() + defer d.updateLock.Unlock() + + if diff := d.ConfigDiff(config); diff != "" { + d.logger.Info("Updating config (-old +new)", zap.String("diff", diff)) + + newSubs := make(map[subscription]bool) + + // Subscribe to new subscriptions + for _, cc := range config.ChannelConfigs { + channelRef := buses.ChannelReference{ + Name: cc.Name, + Namespace: cc.Namespace, + } + for _, subSpec := range cc.FanoutConfig.Subscriptions { + sub := newSubscription(subSpec) + if _, ok := d.kafkaConsumers[channelRef][sub]; ok { + // subscribe can be called multiple times for the same subscription, + // unsubscribe before we resubscribe. + // TODO The behavior to unsubscribe and re-subscribe is retained from the older kafka bus + // implementation. It is not clear as to why this is needed instead of just re-using the same + // consumer. + err := d.unsubscribe(channelRef, sub) + if err != nil { + return err + } + } + d.subscribe(channelRef, sub) + newSubs[sub] = true + } + } + + // Unsubscribe and close consumer for any deleted subscriptions + for channelRef, subMap := range d.kafkaConsumers { + for sub, _ := range subMap { + if ok := newSubs[sub]; !ok { + d.unsubscribe(channelRef, sub) + } + } + } + + // Update the config so that it can be used for comparison during next sync + d.setConfig(config) + } + return nil +} + +// Run starts the kafka dispatcher's message processing. +func (d *KafkaDispatcher) Start(stopCh <-chan struct{}) error { + if d.receiver == nil { + return fmt.Errorf("message receiver is not set") + } + + if d.kafkaAsyncProducer == nil { + return fmt.Errorf("kafkaAsyncProducer is not set") + } + + go func() { + for { + select { + case e := <-d.kafkaAsyncProducer.Errors(): + d.logger.Warn("Got", zap.Error(e)) + case s := <-d.kafkaAsyncProducer.Successes(): + d.logger.Info("Sent", zap.Any("success", s)) + case <-stopCh: + return + } + } + }() + + d.receiver.Run(stopCh) + + return nil +} + +func (d *KafkaDispatcher) subscribe(channelRef buses.ChannelReference, sub subscription) error { + + d.logger.Info("Subscribing", zap.Any("channelRef", channelRef), zap.Any("subscription", sub)) + + topicName := topicName(channelRef) + + group := fmt.Sprintf("%s.%s.%s", controller.Name, sub.Namespace, sub.Name) + consumer, err := d.kafkaCluster.NewConsumer(group, []string{topicName}) + if err != nil { + return err + } + + channelMap, ok := d.kafkaConsumers[channelRef] + if !ok { + channelMap = make(map[subscription]KafkaConsumer) + d.kafkaConsumers[channelRef] = channelMap + } + channelMap[sub] = consumer + + go func() { + for { + msg, more := <-consumer.Messages() + if more { + d.logger.Info("Dispatching a message for subscription", zap.Any("channelRef", channelRef), zap.Any("subscription", sub)) + message := fromKafkaMessage(msg) + err := d.dispatchMessage(message, sub) + if err != nil { + d.logger.Warn("Got error trying to dispatch message", zap.Error(err)) + } + // TODO: handle errors with pluggable strategy + consumer.MarkOffset(msg, "") // Mark message as processed + } else { + break + } + } + d.logger.Info("Consumer for subscription stopped", zap.Any("channelRef", channelRef), zap.Any("subscription", sub)) + }() + + return nil +} + +func (d *KafkaDispatcher) unsubscribe(channel buses.ChannelReference, sub subscription) error { + d.logger.Info("Unsubscribing from channel", zap.Any("channel", channel), zap.Any("subscription", sub)) + if consumer, ok := d.kafkaConsumers[channel][sub]; ok { + delete(d.kafkaConsumers[channel], sub) + return consumer.Close() + } + return nil +} + +// dispatchMessage sends the request to exactly one subscription. It handles both the `call` and +// the `sink` portions of the subscription. +func (d *KafkaDispatcher) dispatchMessage(m *buses.Message, sub subscription) error { + return d.dispatcher.DispatchMessage(m, sub.SubscriberURI, sub.ReplyURI, buses.DispatchDefaults{}) +} + +func (d *KafkaDispatcher) getConfig() *multichannelfanout.Config { + return d.config.Load().(*multichannelfanout.Config) +} + +func (d *KafkaDispatcher) setConfig(config *multichannelfanout.Config) { + d.config.Store(config) +} + +func NewDispatcher(brokers []string, logger *zap.Logger) (*KafkaDispatcher, error) { + + conf := sarama.NewConfig() + conf.Version = sarama.V1_1_0_0 + conf.ClientID = controller.Name + "-dispatcher" + client, err := sarama.NewClient(brokers, conf) + if err != nil { + return nil, fmt.Errorf("unable to create kafka client: %v", err) + } + + producer, err := sarama.NewAsyncProducerFromClient(client) + if err != nil { + return nil, fmt.Errorf("unable to create kafka producer: %v", err) + } + + dispatcher := &KafkaDispatcher{ + dispatcher: buses.NewMessageDispatcher(logger.Sugar()), + + kafkaCluster: &saramaCluster{kafkaBrokers: brokers}, + kafkaConsumers: make(map[buses.ChannelReference]map[subscription]KafkaConsumer), + kafkaAsyncProducer: producer, + + logger: logger, + } + receiverFunc := buses.NewMessageReceiver( + func(channel buses.ChannelReference, message *buses.Message) error { + dispatcher.kafkaAsyncProducer.Input() <- toKafkaMessage(channel, message) + return nil + }, logger.Sugar()) + dispatcher.receiver = receiverFunc + dispatcher.setConfig(&multichannelfanout.Config{}) + return dispatcher, nil +} + +func fromKafkaMessage(kafkaMessage *sarama.ConsumerMessage) *buses.Message { + headers := make(map[string]string) + for _, header := range kafkaMessage.Headers { + headers[string(header.Key)] = string(header.Value) + } + message := buses.Message{ + Headers: headers, + Payload: kafkaMessage.Value, + } + return &message +} + +func toKafkaMessage(channel buses.ChannelReference, message *buses.Message) *sarama.ProducerMessage { + kafkaMessage := sarama.ProducerMessage{ + Topic: topicName(channel), + Value: sarama.ByteEncoder(message.Payload), + } + for h, v := range message.Headers { + kafkaMessage.Headers = append(kafkaMessage.Headers, sarama.RecordHeader{ + Key: []byte(h), + Value: []byte(v), + }) + } + return &kafkaMessage +} + +func topicName(channel buses.ChannelReference) string { + return fmt.Sprintf("%s.%s", channel.Namespace, channel.Name) +} + +func newSubscription(spec eventingduck.ChannelSubscriberSpec) subscription { + return subscription{ + Name: spec.Ref.Name, + Namespace: spec.Ref.Namespace, + SubscriberURI: spec.SubscriberURI, + ReplyURI: spec.ReplyURI, + } +} diff --git a/pkg/provisioners/kafka/dispatcher/dispatcher_test.go b/pkg/provisioners/kafka/dispatcher/dispatcher_test.go new file mode 100644 index 00000000000..3e2b6d39dc6 --- /dev/null +++ b/pkg/provisioners/kafka/dispatcher/dispatcher_test.go @@ -0,0 +1,476 @@ +package dispatcher + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/Shopify/sarama" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "go.uber.org/zap" + "k8s.io/api/core/v1" + + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + "github.com/knative/eventing/pkg/buses" + "github.com/knative/eventing/pkg/sidecar/fanout" + "github.com/knative/eventing/pkg/sidecar/multichannelfanout" +) + +type mockConsumer struct { + message chan *sarama.ConsumerMessage +} + +func (c *mockConsumer) Messages() <-chan *sarama.ConsumerMessage { + return c.message +} + +func (c *mockConsumer) Close() error { + return nil +} + +func (c *mockConsumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) { + return +} + +type mockSaramaCluster struct { + // closed closes the message channel so that it doesn't block during the test + closed bool + // Handle to the latest created consumer, useful to access underlying message chan + consumerChannel chan *sarama.ConsumerMessage + // createErr will return an error when creating a consumer + createErr bool +} + +func (c *mockSaramaCluster) NewConsumer(groupID string, topics []string) (KafkaConsumer, error) { + if c.createErr { + return nil, fmt.Errorf("error creating consumer") + } + consumer := &mockConsumer{ + message: make(chan *sarama.ConsumerMessage), + } + if c.closed { + close(consumer.message) + } + c.consumerChannel = consumer.message + return consumer, nil +} + +func TestDispatcher_UpdateConfig(t *testing.T) { + testCases := []struct { + name string + oldConfig *multichannelfanout.Config + newConfig *multichannelfanout.Config + subscribes []string + unsubscribes []string + createErr string + }{ + { + name: "nil config", + oldConfig: &multichannelfanout.Config{}, + newConfig: nil, + createErr: "nil config", + }, + { + name: "same config", + oldConfig: &multichannelfanout.Config{}, + newConfig: &multichannelfanout.Config{}, + }, + { + name: "config with no subscription", + oldConfig: &multichannelfanout.Config{}, + newConfig: &multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: "default", + Name: "test-channel", + }, + }, + }, + }, + { + name: "single channel w/ new subscriptions", + oldConfig: &multichannelfanout.Config{}, + newConfig: &multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: "default", + Name: "test-channel", + FanoutConfig: fanout.Config{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ + { + Ref: &v1.ObjectReference{ + Name: "subscription-1", + }, + SubscriberURI: "http://test/subscriber", + }, + { + Ref: &v1.ObjectReference{ + Name: "subscription-2", + }, + SubscriberURI: "http://test/subscriber", + }, + }, + }, + }, + }, + }, + subscribes: []string{"subscription-1", "subscription-2"}, + }, + { + name: "single channel w/ existing subscriptions", + oldConfig: &multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: "default", + Name: "test-channel", + FanoutConfig: fanout.Config{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ + { + Ref: &v1.ObjectReference{ + Name: "subscription-1", + }, + SubscriberURI: "http://test/subscriber", + }, + { + Ref: &v1.ObjectReference{ + Name: "subscription-2", + }, + SubscriberURI: "http://test/subscriber", + }}}}}, + }, + newConfig: &multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: "default", + Name: "test-channel", + FanoutConfig: fanout.Config{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ + { + Ref: &v1.ObjectReference{ + Name: "subscription-2", + }, + SubscriberURI: "http://test/subscriber", + }, + { + Ref: &v1.ObjectReference{ + Name: "subscription-3", + }, + SubscriberURI: "http://test/subscriber", + }, + }, + }, + }, + }, + }, + subscribes: []string{"subscription-2", "subscription-3"}, + unsubscribes: []string{"subscription-1"}, + }, + { + name: "multi channel w/old and new subscriptions", + oldConfig: &multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: "default", + Name: "test-channel-1", + FanoutConfig: fanout.Config{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ + { + Ref: &v1.ObjectReference{ + Name: "subscription-1", + }, + SubscriberURI: "http://test/subscriber", + }, + { + Ref: &v1.ObjectReference{ + Name: "subscription-2", + }, + SubscriberURI: "http://test/subscriber", + }, + }, + }}}}, + newConfig: &multichannelfanout.Config{ + ChannelConfigs: []multichannelfanout.ChannelConfig{ + { + Namespace: "default", + Name: "test-channel-1", + FanoutConfig: fanout.Config{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ + { + Ref: &v1.ObjectReference{ + Name: "subscription-1", + }, + SubscriberURI: "http://test/subscriber", + }, + }, + }, + }, + { + Namespace: "default", + Name: "test-channel-2", + FanoutConfig: fanout.Config{ + Subscriptions: []eventingduck.ChannelSubscriberSpec{ + { + Ref: &v1.ObjectReference{ + Name: "subscription-3", + }, + SubscriberURI: "http://test/subscriber", + }, + { + Ref: &v1.ObjectReference{ + Name: "subscription-4", + }, + SubscriberURI: "http://test/subscriber", + }, + }, + }, + }, + }, + }, + subscribes: []string{"subscription-1", "subscription-3", "subscription-4"}, + unsubscribes: []string{"subscription-2"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Logf("Running %s", t.Name()) + d := &KafkaDispatcher{ + kafkaCluster: &mockSaramaCluster{closed: true}, + kafkaConsumers: make(map[buses.ChannelReference]map[subscription]KafkaConsumer), + + logger: zap.NewNop(), + } + d.setConfig(&multichannelfanout.Config{}) + + // Initialize using oldConfig + err := d.UpdateConfig(tc.oldConfig) + if err != nil { + t.Errorf("unexpected error: %s", err) + } + oldSubscribers := make(map[string]bool) + for _, subMap := range d.kafkaConsumers { + for sub, _ := range subMap { + oldSubscribers[sub.Name] = true + } + } + for _, sub := range tc.unsubscribes { + if ok := oldSubscribers[sub]; !ok { + t.Errorf("subscription %s was never subscribed", sub) + } + } + + // Update with new config + err = d.UpdateConfig(tc.newConfig) + if tc.createErr != "" { + if err == nil { + t.Errorf("Expected UpdateConfig error: '%v'. Actual nil", tc.createErr) + } else if err.Error() != tc.createErr { + t.Errorf("Unexpected UpdateConfig error. Expected '%v'. Actual '%v'", tc.createErr, err) + } + return + } else if err != nil { + t.Errorf("Unexpected UpdateConfig error. Expected nil. Actual '%v'", err) + } + + var newSubscribers []string + for _, subMap := range d.kafkaConsumers { + for sub, _ := range subMap { + newSubscribers = append(newSubscribers, sub.Name) + } + } + + if diff := cmp.Diff(tc.subscribes, newSubscribers, sortStrings); diff != "" { + t.Errorf("unexpected subscribers (-want, +got) = %v", diff) + } + + }) + } +} + +func TestFromKafkaMessage(t *testing.T) { + data := []byte("data") + kafkaMessage := &sarama.ConsumerMessage{ + Headers: []*sarama.RecordHeader{ + { + Key: []byte("k1"), + Value: []byte("v1"), + }, + }, + Value: data, + } + want := &buses.Message{ + Headers: map[string]string{ + "k1": "v1", + }, + Payload: data, + } + got := fromKafkaMessage(kafkaMessage) + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected message (-want, +got) = %v", diff) + } +} + +func TestToKafkaMessage(t *testing.T) { + data := []byte("data") + channelRef := buses.ChannelReference{ + Name: "test-channel", + Namespace: "test-ns", + } + msg := &buses.Message{ + Headers: map[string]string{ + "k1": "v1", + }, + Payload: data, + } + want := &sarama.ProducerMessage{ + Topic: "test-ns.test-channel", + Headers: []sarama.RecordHeader{ + { + Key: []byte("k1"), + Value: []byte("v1"), + }, + }, + Value: sarama.ByteEncoder(data), + } + got := toKafkaMessage(channelRef, msg) + if diff := cmp.Diff(want, got, cmpopts.IgnoreUnexported(sarama.ProducerMessage{})); diff != "" { + t.Errorf("unexpected message (-want, +got) = %v", diff) + } +} + +type dispatchTestHandler struct { + t *testing.T + payload []byte + done chan bool +} + +func (h *dispatchTestHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + defer r.Body.Close() + body, err := ioutil.ReadAll(r.Body) + if err != nil { + h.t.Error("Failed to read the request body") + } + if diff := cmp.Diff(h.payload, body); diff != "" { + h.t.Errorf("unexpected body (-want, +got) = %v", diff) + } + h.done <- true +} + +func TestSubscribe(t *testing.T) { + sc := &mockSaramaCluster{} + data := []byte("data") + d := &KafkaDispatcher{ + kafkaCluster: sc, + kafkaConsumers: make(map[buses.ChannelReference]map[subscription]KafkaConsumer), + dispatcher: buses.NewMessageDispatcher(zap.NewNop().Sugar()), + logger: zap.NewNop(), + } + + testHandler := &dispatchTestHandler{ + t: t, + payload: data, + done: make(chan bool)} + + server := httptest.NewServer(testHandler) + defer server.Close() + + channelRef := buses.ChannelReference{ + Name: "test-channel", + Namespace: "test-ns", + } + + subRef := subscription{ + Name: "test-sub", + Namespace: "test-ns", + SubscriberURI: server.URL[7:], + } + err := d.subscribe(channelRef, subRef) + if err != nil { + t.Errorf("unexpected error %s", err) + } + defer close(sc.consumerChannel) + sc.consumerChannel <- &sarama.ConsumerMessage{ + Headers: []*sarama.RecordHeader{ + { + Key: []byte("k1"), + Value: []byte("v1"), + }, + }, + Value: data, + } + + <-testHandler.done + +} + +func TestSubscribeError(t *testing.T) { + sc := &mockSaramaCluster{ + createErr: true, + } + d := &KafkaDispatcher{ + kafkaCluster: sc, + logger: zap.NewNop(), + } + + channelRef := buses.ChannelReference{ + Name: "test-channel", + Namespace: "test-ns", + } + + subRef := subscription{ + Name: "test-sub", + Namespace: "test-ns", + } + err := d.subscribe(channelRef, subRef) + if err == nil { + t.Errorf("expected error want %s, got %s", "error creating consumer", err) + } +} + +func TestUnsubscribeUnknownSub(t *testing.T) { + sc := &mockSaramaCluster{ + createErr: true, + } + d := &KafkaDispatcher{ + kafkaCluster: sc, + logger: zap.NewNop(), + } + + channelRef := buses.ChannelReference{ + Name: "test-channel", + Namespace: "test-ns", + } + + subRef := subscription{ + Name: "test-sub", + Namespace: "test-ns", + } + err := d.unsubscribe(channelRef, subRef) + if err != nil { + t.Errorf("uexpected error %s", err) + } +} + +func TestKafkaDispatcher_Start(t *testing.T) { + d := &KafkaDispatcher{} + err := d.Start(make(chan struct{})) + if err == nil { + t.Errorf("expected error want %s, got %s", "message receiver is not set", err) + } + + d.receiver = buses.NewMessageReceiver(func(channel buses.ChannelReference, message *buses.Message) error { + return nil + }, zap.NewNop().Sugar()) + err = d.Start(make(chan struct{})) + if err == nil { + t.Errorf("expected error want %s, got %s", "kafkaAsyncProducer is not set", err) + } +} + +var sortStrings = cmpopts.SortSlices(func(x, y string) bool { + return x < y +}) diff --git a/third_party/VENDOR-LICENSE b/third_party/VENDOR-LICENSE index a4b76f23095..44283ce9234 100644 --- a/third_party/VENDOR-LICENSE +++ b/third_party/VENDOR-LICENSE @@ -259,6 +259,34 @@ WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +=========================================================== +Import: github.com/knative/eventing/vendor/github.com/bsm/sarama-cluster + +(The MIT License) + +Copyright (c) 2017 Black Square Media Ltd + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +'Software'), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + + =========================================================== Import: github.com/knative/eventing/vendor/github.com/davecgh/go-spew diff --git a/vendor/github.com/bsm/sarama-cluster/LICENSE b/vendor/github.com/bsm/sarama-cluster/LICENSE new file mode 100644 index 00000000000..127751c47a8 --- /dev/null +++ b/vendor/github.com/bsm/sarama-cluster/LICENSE @@ -0,0 +1,22 @@ +(The MIT License) + +Copyright (c) 2017 Black Square Media Ltd + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +'Software'), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/vendor/github.com/bsm/sarama-cluster/balancer.go b/vendor/github.com/bsm/sarama-cluster/balancer.go new file mode 100644 index 00000000000..3aeaecef7a3 --- /dev/null +++ b/vendor/github.com/bsm/sarama-cluster/balancer.go @@ -0,0 +1,170 @@ +package cluster + +import ( + "math" + "sort" + + "github.com/Shopify/sarama" +) + +// NotificationType defines the type of notification +type NotificationType uint8 + +// String describes the notification type +func (t NotificationType) String() string { + switch t { + case RebalanceStart: + return "rebalance start" + case RebalanceOK: + return "rebalance OK" + case RebalanceError: + return "rebalance error" + } + return "unknown" +} + +const ( + UnknownNotification NotificationType = iota + RebalanceStart + RebalanceOK + RebalanceError +) + +// Notification are state events emitted by the consumers on rebalance +type Notification struct { + // Type exposes the notification type + Type NotificationType + + // Claimed contains topic/partitions that were claimed by this rebalance cycle + Claimed map[string][]int32 + + // Released contains topic/partitions that were released as part of this rebalance cycle + Released map[string][]int32 + + // Current are topic/partitions that are currently claimed to the consumer + Current map[string][]int32 +} + +func newNotification(current map[string][]int32) *Notification { + return &Notification{ + Type: RebalanceStart, + Current: current, + } +} + +func (n *Notification) success(current map[string][]int32) *Notification { + o := &Notification{ + Type: RebalanceOK, + Claimed: make(map[string][]int32), + Released: make(map[string][]int32), + Current: current, + } + for topic, partitions := range current { + o.Claimed[topic] = int32Slice(partitions).Diff(int32Slice(n.Current[topic])) + } + for topic, partitions := range n.Current { + o.Released[topic] = int32Slice(partitions).Diff(int32Slice(current[topic])) + } + return o +} + +// -------------------------------------------------------------------- + +type topicInfo struct { + Partitions []int32 + MemberIDs []string +} + +func (info topicInfo) Perform(s Strategy) map[string][]int32 { + if s == StrategyRoundRobin { + return info.RoundRobin() + } + return info.Ranges() +} + +func (info topicInfo) Ranges() map[string][]int32 { + sort.Strings(info.MemberIDs) + + mlen := len(info.MemberIDs) + plen := len(info.Partitions) + res := make(map[string][]int32, mlen) + + for pos, memberID := range info.MemberIDs { + n, i := float64(plen)/float64(mlen), float64(pos) + min := int(math.Floor(i*n + 0.5)) + max := int(math.Floor((i+1)*n + 0.5)) + sub := info.Partitions[min:max] + if len(sub) > 0 { + res[memberID] = sub + } + } + return res +} + +func (info topicInfo) RoundRobin() map[string][]int32 { + sort.Strings(info.MemberIDs) + + mlen := len(info.MemberIDs) + res := make(map[string][]int32, mlen) + for i, pnum := range info.Partitions { + memberID := info.MemberIDs[i%mlen] + res[memberID] = append(res[memberID], pnum) + } + return res +} + +// -------------------------------------------------------------------- + +type balancer struct { + client sarama.Client + topics map[string]topicInfo +} + +func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) { + balancer := newBalancer(client) + for memberID, meta := range members { + for _, topic := range meta.Topics { + if err := balancer.Topic(topic, memberID); err != nil { + return nil, err + } + } + } + return balancer, nil +} + +func newBalancer(client sarama.Client) *balancer { + return &balancer{ + client: client, + topics: make(map[string]topicInfo), + } +} + +func (r *balancer) Topic(name string, memberID string) error { + topic, ok := r.topics[name] + if !ok { + nums, err := r.client.Partitions(name) + if err != nil { + return err + } + topic = topicInfo{ + Partitions: nums, + MemberIDs: make([]string, 0, 1), + } + } + topic.MemberIDs = append(topic.MemberIDs, memberID) + r.topics[name] = topic + return nil +} + +func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 { + res := make(map[string]map[string][]int32, 1) + for topic, info := range r.topics { + for memberID, partitions := range info.Perform(s) { + if _, ok := res[memberID]; !ok { + res[memberID] = make(map[string][]int32, 1) + } + res[memberID][topic] = partitions + } + } + return res +} diff --git a/vendor/github.com/bsm/sarama-cluster/client.go b/vendor/github.com/bsm/sarama-cluster/client.go new file mode 100644 index 00000000000..42ffb30c01a --- /dev/null +++ b/vendor/github.com/bsm/sarama-cluster/client.go @@ -0,0 +1,50 @@ +package cluster + +import ( + "errors" + "sync/atomic" + + "github.com/Shopify/sarama" +) + +var errClientInUse = errors.New("cluster: client is already used by another consumer") + +// Client is a group client +type Client struct { + sarama.Client + config Config + + inUse uint32 +} + +// NewClient creates a new client instance +func NewClient(addrs []string, config *Config) (*Client, error) { + if config == nil { + config = NewConfig() + } + + if err := config.Validate(); err != nil { + return nil, err + } + + client, err := sarama.NewClient(addrs, &config.Config) + if err != nil { + return nil, err + } + + return &Client{Client: client, config: *config}, nil +} + +// ClusterConfig returns the cluster configuration. +func (c *Client) ClusterConfig() *Config { + cfg := c.config + return &cfg +} + +func (c *Client) claim() bool { + return atomic.CompareAndSwapUint32(&c.inUse, 0, 1) +} + +func (c *Client) release() { + atomic.CompareAndSwapUint32(&c.inUse, 1, 0) +} diff --git a/vendor/github.com/bsm/sarama-cluster/cluster.go b/vendor/github.com/bsm/sarama-cluster/cluster.go new file mode 100644 index 00000000000..adcf0e9c1cf --- /dev/null +++ b/vendor/github.com/bsm/sarama-cluster/cluster.go @@ -0,0 +1,25 @@ +package cluster + +// Strategy for partition to consumer assignement +type Strategy string + +const ( + // StrategyRange is the default and assigns partition ranges to consumers. + // Example with six partitions and two consumers: + // C1: [0, 1, 2] + // C2: [3, 4, 5] + StrategyRange Strategy = "range" + + // StrategyRoundRobin assigns partitions by alternating over consumers. + // Example with six partitions and two consumers: + // C1: [0, 2, 4] + // C2: [1, 3, 5] + StrategyRoundRobin Strategy = "roundrobin" +) + +// Error instances are wrappers for internal errors with a context and +// may be returned through the consumer's Errors() channel +type Error struct { + Ctx string + error +} diff --git a/vendor/github.com/bsm/sarama-cluster/config.go b/vendor/github.com/bsm/sarama-cluster/config.go new file mode 100644 index 00000000000..084b835f710 --- /dev/null +++ b/vendor/github.com/bsm/sarama-cluster/config.go @@ -0,0 +1,146 @@ +package cluster + +import ( + "regexp" + "time" + + "github.com/Shopify/sarama" +) + +var minVersion = sarama.V0_9_0_0 + +type ConsumerMode uint8 + +const ( + ConsumerModeMultiplex ConsumerMode = iota + ConsumerModePartitions +) + +// Config extends sarama.Config with Group specific namespace +type Config struct { + sarama.Config + + // Group is the namespace for group management properties + Group struct { + + // The strategy to use for the allocation of partitions to consumers (defaults to StrategyRange) + PartitionStrategy Strategy + + // By default, messages and errors from the subscribed topics and partitions are all multiplexed and + // made available through the consumer's Messages() and Errors() channels. + // + // Users who require low-level access can enable ConsumerModePartitions where individual partitions + // are exposed on the Partitions() channel. Messages and errors must then be consumed on the partitions + // themselves. + Mode ConsumerMode + + Offsets struct { + Retry struct { + // The numer retries when committing offsets (defaults to 3). + Max int + } + Synchronization struct { + // The duration allowed for other clients to commit their offsets before resumption in this client, e.g. during a rebalance + // NewConfig sets this to the Consumer.MaxProcessingTime duration of the Sarama configuration + DwellTime time.Duration + } + } + + Session struct { + // The allowed session timeout for registered consumers (defaults to 30s). + // Must be within the allowed server range. + Timeout time.Duration + } + + Heartbeat struct { + // Interval between each heartbeat (defaults to 3s). It should be no more + // than 1/3rd of the Group.Session.Timout setting + Interval time.Duration + } + + // Return specifies which group channels will be populated. If they are set to true, + // you must read from the respective channels to prevent deadlock. + Return struct { + // If enabled, rebalance notification will be returned on the + // Notifications channel (default disabled). + Notifications bool + } + + Topics struct { + // An additional whitelist of topics to subscribe to. + Whitelist *regexp.Regexp + // An additional blacklist of topics to avoid. If set, this will precede over + // the Whitelist setting. + Blacklist *regexp.Regexp + } + + Member struct { + // Custom metadata to include when joining the group. The user data for all joined members + // can be retrieved by sending a DescribeGroupRequest to the broker that is the + // coordinator for the group. + UserData []byte + } + } +} + +// NewConfig returns a new configuration instance with sane defaults. +func NewConfig() *Config { + c := &Config{ + Config: *sarama.NewConfig(), + } + c.Group.PartitionStrategy = StrategyRange + c.Group.Offsets.Retry.Max = 3 + c.Group.Offsets.Synchronization.DwellTime = c.Consumer.MaxProcessingTime + c.Group.Session.Timeout = 30 * time.Second + c.Group.Heartbeat.Interval = 3 * time.Second + c.Config.Version = minVersion + return c +} + +// Validate checks a Config instance. It will return a +// sarama.ConfigurationError if the specified values don't make sense. +func (c *Config) Validate() error { + if c.Group.Heartbeat.Interval%time.Millisecond != 0 { + sarama.Logger.Println("Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.") + } + if c.Group.Session.Timeout%time.Millisecond != 0 { + sarama.Logger.Println("Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.") + } + if c.Group.PartitionStrategy != StrategyRange && c.Group.PartitionStrategy != StrategyRoundRobin { + sarama.Logger.Println("Group.PartitionStrategy is not supported; range will be assumed.") + } + if !c.Version.IsAtLeast(minVersion) { + sarama.Logger.Println("Version is not supported; 0.9. will be assumed.") + c.Version = minVersion + } + if err := c.Config.Validate(); err != nil { + return err + } + + // validate the Group values + switch { + case c.Group.Offsets.Retry.Max < 0: + return sarama.ConfigurationError("Group.Offsets.Retry.Max must be >= 0") + case c.Group.Offsets.Synchronization.DwellTime <= 0: + return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be > 0") + case c.Group.Offsets.Synchronization.DwellTime > 10*time.Minute: + return sarama.ConfigurationError("Group.Offsets.Synchronization.DwellTime must be <= 10m") + case c.Group.Heartbeat.Interval <= 0: + return sarama.ConfigurationError("Group.Heartbeat.Interval must be > 0") + case c.Group.Session.Timeout <= 0: + return sarama.ConfigurationError("Group.Session.Timeout must be > 0") + case !c.Metadata.Full && c.Group.Topics.Whitelist != nil: + return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Whitelist is used") + case !c.Metadata.Full && c.Group.Topics.Blacklist != nil: + return sarama.ConfigurationError("Metadata.Full must be enabled when Group.Topics.Blacklist is used") + } + + // ensure offset is correct + switch c.Consumer.Offsets.Initial { + case sarama.OffsetOldest, sarama.OffsetNewest: + default: + return sarama.ConfigurationError("Consumer.Offsets.Initial must be either OffsetOldest or OffsetNewest") + } + + return nil +} diff --git a/vendor/github.com/bsm/sarama-cluster/consumer.go b/vendor/github.com/bsm/sarama-cluster/consumer.go new file mode 100644 index 00000000000..e7a67dac8b6 --- /dev/null +++ b/vendor/github.com/bsm/sarama-cluster/consumer.go @@ -0,0 +1,919 @@ +package cluster + +import ( + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/Shopify/sarama" +) + +// Consumer is a cluster group consumer +type Consumer struct { + client *Client + ownClient bool + + consumer sarama.Consumer + subs *partitionMap + + consumerID string + groupID string + + memberID string + generationID int32 + membershipMu sync.RWMutex + + coreTopics []string + extraTopics []string + + dying, dead chan none + closeOnce sync.Once + + consuming int32 + messages chan *sarama.ConsumerMessage + errors chan error + partitions chan PartitionConsumer + notifications chan *Notification + + commitMu sync.Mutex +} + +// NewConsumer initializes a new consumer +func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error) { + client, err := NewClient(addrs, config) + if err != nil { + return nil, err + } + + consumer, err := NewConsumerFromClient(client, groupID, topics) + if err != nil { + return nil, err + } + consumer.ownClient = true + return consumer, nil +} + +// NewConsumerFromClient initializes a new consumer from an existing client. +// +// Please note that clients cannot be shared between consumers (due to Kafka internals), +// they can only be re-used which requires the user to call Close() on the first consumer +// before using this method again to initialize another one. Attempts to use a client with +// more than one consumer at a time will return errors. +func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error) { + if !client.claim() { + return nil, errClientInUse + } + + consumer, err := sarama.NewConsumerFromClient(client.Client) + if err != nil { + client.release() + return nil, err + } + + sort.Strings(topics) + c := &Consumer{ + client: client, + consumer: consumer, + subs: newPartitionMap(), + groupID: groupID, + + coreTopics: topics, + + dying: make(chan none), + dead: make(chan none), + + messages: make(chan *sarama.ConsumerMessage), + errors: make(chan error, client.config.ChannelBufferSize), + partitions: make(chan PartitionConsumer, 1), + notifications: make(chan *Notification), + } + if err := c.client.RefreshCoordinator(groupID); err != nil { + client.release() + return nil, err + } + + go c.mainLoop() + return c, nil +} + +// Messages returns the read channel for the messages that are returned by +// the broker. +// +// This channel will only return if Config.Group.Mode option is set to +// ConsumerModeMultiplex (default). +func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage { return c.messages } + +// Partitions returns the read channels for individual partitions of this broker. +// +// This will channel will only return if Config.Group.Mode option is set to +// ConsumerModePartitions. +// +// The Partitions() channel must be listened to for the life of this consumer; +// when a rebalance happens old partitions will be closed (naturally come to +// completion) and new ones will be emitted. The returned channel will only close +// when the consumer is completely shut down. +func (c *Consumer) Partitions() <-chan PartitionConsumer { return c.partitions } + +// Errors returns a read channel of errors that occur during offset management, if +// enabled. By default, errors are logged and not returned over this channel. If +// you want to implement any custom error handling, set your config's +// Consumer.Return.Errors setting to true, and read from this channel. +func (c *Consumer) Errors() <-chan error { return c.errors } + +// Notifications returns a channel of Notifications that occur during consumer +// rebalancing. Notifications will only be emitted over this channel, if your config's +// Group.Return.Notifications setting to true. +func (c *Consumer) Notifications() <-chan *Notification { return c.notifications } + +// HighWaterMarks returns the current high water marks for each topic and partition +// Consistency between partitions is not guaranteed since high water marks are updated separately. +func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consumer.HighWaterMarks() } + +// MarkOffset marks the provided message as processed, alongside a metadata string +// that represents the state of the partition consumer at that point in time. The +// metadata string can be used by another consumer to restore that state, so it +// can resume consumption. +// +// Note: calling MarkOffset does not necessarily commit the offset to the backend +// store immediately for efficiency reasons, and it may never be committed if +// your application crashes. This means that you may end up processing the same +// message twice, and your processing should ideally be idempotent. +func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) { + if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil { + sub.MarkOffset(msg.Offset, metadata) + } +} + +// MarkPartitionOffset marks an offset of the provided topic/partition as processed. +// See MarkOffset for additional explanation. +func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) { + if sub := c.subs.Fetch(topic, partition); sub != nil { + sub.MarkOffset(offset, metadata) + } +} + +// MarkOffsets marks stashed offsets as processed. +// See MarkOffset for additional explanation. +func (c *Consumer) MarkOffsets(s *OffsetStash) { + s.mu.Lock() + defer s.mu.Unlock() + + for tp, info := range s.offsets { + if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil { + sub.MarkOffset(info.Offset, info.Metadata) + } + delete(s.offsets, tp) + } +} + +// ResetOffsets marks the provided message as processed, alongside a metadata string +// that represents the state of the partition consumer at that point in time. The +// metadata string can be used by another consumer to restore that state, so it +// can resume consumption. +// +// Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset +func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) { + if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil { + sub.ResetOffset(msg.Offset, metadata) + } +} + +// ResetPartitionOffset marks an offset of the provided topic/partition as processed. +// See ResetOffset for additional explanation. +func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) { + sub := c.subs.Fetch(topic, partition) + if sub != nil { + sub.ResetOffset(offset, metadata) + } +} + +// ResetOffsets marks stashed offsets as processed. +// See ResetOffset for additional explanation. +func (c *Consumer) ResetOffsets(s *OffsetStash) { + s.mu.Lock() + defer s.mu.Unlock() + + for tp, info := range s.offsets { + if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil { + sub.ResetOffset(info.Offset, info.Metadata) + } + delete(s.offsets, tp) + } +} + +// Subscriptions returns the consumed topics and partitions +func (c *Consumer) Subscriptions() map[string][]int32 { + return c.subs.Info() +} + +// CommitOffsets allows to manually commit previously marked offsets. By default there is no +// need to call this function as the consumer will commit offsets automatically +// using the Config.Consumer.Offsets.CommitInterval setting. +// +// Please be aware that calling this function during an internal rebalance cycle may return +// broker errors (e.g. sarama.ErrUnknownMemberId or sarama.ErrIllegalGeneration). +func (c *Consumer) CommitOffsets() error { + c.commitMu.Lock() + defer c.commitMu.Unlock() + + memberID, generationID := c.membership() + req := &sarama.OffsetCommitRequest{ + Version: 2, + ConsumerGroup: c.groupID, + ConsumerGroupGeneration: generationID, + ConsumerID: memberID, + RetentionTime: -1, + } + + if ns := c.client.config.Consumer.Offsets.Retention; ns != 0 { + req.RetentionTime = int64(ns / time.Millisecond) + } + + snap := c.subs.Snapshot() + dirty := false + for tp, state := range snap { + if state.Dirty { + dirty = true + req.AddBlock(tp.Topic, tp.Partition, state.Info.Offset, 0, state.Info.Metadata) + } + } + if !dirty { + return nil + } + + broker, err := c.client.Coordinator(c.groupID) + if err != nil { + c.closeCoordinator(broker, err) + return err + } + + resp, err := broker.CommitOffset(req) + if err != nil { + c.closeCoordinator(broker, err) + return err + } + + for topic, errs := range resp.Errors { + for partition, kerr := range errs { + if kerr != sarama.ErrNoError { + err = kerr + } else if state, ok := snap[topicPartition{topic, partition}]; ok { + if sub := c.subs.Fetch(topic, partition); sub != nil { + sub.markCommitted(state.Info.Offset) + } + } + } + } + return err +} + +// Close safely closes the consumer and releases all resources +func (c *Consumer) Close() (err error) { + c.closeOnce.Do(func() { + close(c.dying) + <-c.dead + + if e := c.release(); e != nil { + err = e + } + if e := c.consumer.Close(); e != nil { + err = e + } + close(c.messages) + close(c.errors) + + if e := c.leaveGroup(); e != nil { + err = e + } + close(c.partitions) + close(c.notifications) + + // drain + for range c.messages { + } + for range c.errors { + } + for p := range c.partitions { + _ = p.Close() + } + for range c.notifications { + } + + c.client.release() + if c.ownClient { + if e := c.client.Close(); e != nil { + err = e + } + } + }) + return +} + +func (c *Consumer) mainLoop() { + defer close(c.dead) + defer atomic.StoreInt32(&c.consuming, 0) + + for { + atomic.StoreInt32(&c.consuming, 0) + + // Check if close was requested + select { + case <-c.dying: + return + default: + } + + // Start next consume cycle + c.nextTick() + } +} + +func (c *Consumer) nextTick() { + // Remember previous subscriptions + var notification *Notification + if c.client.config.Group.Return.Notifications { + notification = newNotification(c.subs.Info()) + } + + // Refresh coordinator + if err := c.refreshCoordinator(); err != nil { + c.rebalanceError(err, nil) + return + } + + // Release subscriptions + if err := c.release(); err != nil { + c.rebalanceError(err, nil) + return + } + + // Issue rebalance start notification + if c.client.config.Group.Return.Notifications { + c.handleNotification(notification) + } + + // Rebalance, fetch new subscriptions + subs, err := c.rebalance() + if err != nil { + c.rebalanceError(err, notification) + return + } + + // Coordinate loops, make sure everything is + // stopped on exit + tomb := newLoopTomb() + defer tomb.Close() + + // Start the heartbeat + tomb.Go(c.hbLoop) + + // Subscribe to topic/partitions + if err := c.subscribe(tomb, subs); err != nil { + c.rebalanceError(err, notification) + return + } + + // Update/issue notification with new claims + if c.client.config.Group.Return.Notifications { + notification = notification.success(subs) + c.handleNotification(notification) + } + + // Start topic watcher loop + tomb.Go(c.twLoop) + + // Start consuming and committing offsets + tomb.Go(c.cmLoop) + atomic.StoreInt32(&c.consuming, 1) + + // Wait for signals + select { + case <-tomb.Dying(): + case <-c.dying: + } +} + +// heartbeat loop, triggered by the mainLoop +func (c *Consumer) hbLoop(stopped <-chan none) { + ticker := time.NewTicker(c.client.config.Group.Heartbeat.Interval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + switch err := c.heartbeat(); err { + case nil, sarama.ErrNoError: + case sarama.ErrNotCoordinatorForConsumer, sarama.ErrRebalanceInProgress: + return + default: + c.handleError(&Error{Ctx: "heartbeat", error: err}) + return + } + case <-stopped: + return + case <-c.dying: + return + } + } +} + +// topic watcher loop, triggered by the mainLoop +func (c *Consumer) twLoop(stopped <-chan none) { + ticker := time.NewTicker(c.client.config.Metadata.RefreshFrequency / 2) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + topics, err := c.client.Topics() + if err != nil { + c.handleError(&Error{Ctx: "topics", error: err}) + return + } + + for _, topic := range topics { + if !c.isKnownCoreTopic(topic) && + !c.isKnownExtraTopic(topic) && + c.isPotentialExtraTopic(topic) { + return + } + } + case <-stopped: + return + case <-c.dying: + return + } + } +} + +// commit loop, triggered by the mainLoop +func (c *Consumer) cmLoop(stopped <-chan none) { + ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); err != nil { + c.handleError(&Error{Ctx: "commit", error: err}) + return + } + case <-stopped: + return + case <-c.dying: + return + } + } +} + +func (c *Consumer) rebalanceError(err error, n *Notification) { + if n != nil { + n.Type = RebalanceError + c.handleNotification(n) + } + + switch err { + case sarama.ErrRebalanceInProgress: + default: + c.handleError(&Error{Ctx: "rebalance", error: err}) + } + + select { + case <-c.dying: + case <-time.After(c.client.config.Metadata.Retry.Backoff): + } +} + +func (c *Consumer) handleNotification(n *Notification) { + if c.client.config.Group.Return.Notifications { + select { + case c.notifications <- n: + case <-c.dying: + return + } + } +} + +func (c *Consumer) handleError(e *Error) { + if c.client.config.Consumer.Return.Errors { + select { + case c.errors <- e: + case <-c.dying: + return + } + } else { + sarama.Logger.Printf("%s error: %s\n", e.Ctx, e.Error()) + } +} + +// Releases the consumer and commits offsets, called from rebalance() and Close() +func (c *Consumer) release() (err error) { + // Stop all consumers + c.subs.Stop() + + // Clear subscriptions on exit + defer c.subs.Clear() + + // Wait for messages to be processed + timeout := time.NewTimer(c.client.config.Group.Offsets.Synchronization.DwellTime) + defer timeout.Stop() + + select { + case <-c.dying: + case <-timeout.C: + } + + // Commit offsets, continue on errors + if e := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); e != nil { + err = e + } + + return +} + +// -------------------------------------------------------------------- + +// Performs a heartbeat, part of the mainLoop() +func (c *Consumer) heartbeat() error { + broker, err := c.client.Coordinator(c.groupID) + if err != nil { + c.closeCoordinator(broker, err) + return err + } + + memberID, generationID := c.membership() + resp, err := broker.Heartbeat(&sarama.HeartbeatRequest{ + GroupId: c.groupID, + MemberId: memberID, + GenerationId: generationID, + }) + if err != nil { + c.closeCoordinator(broker, err) + return err + } + return resp.Err +} + +// Performs a rebalance, part of the mainLoop() +func (c *Consumer) rebalance() (map[string][]int32, error) { + memberID, _ := c.membership() + sarama.Logger.Printf("cluster/consumer %s rebalance\n", memberID) + + allTopics, err := c.client.Topics() + if err != nil { + return nil, err + } + c.extraTopics = c.selectExtraTopics(allTopics) + sort.Strings(c.extraTopics) + + // Re-join consumer group + strategy, err := c.joinGroup() + switch { + case err == sarama.ErrUnknownMemberId: + c.membershipMu.Lock() + c.memberID = "" + c.membershipMu.Unlock() + return nil, err + case err != nil: + return nil, err + } + + // Sync consumer group state, fetch subscriptions + subs, err := c.syncGroup(strategy) + switch { + case err == sarama.ErrRebalanceInProgress: + return nil, err + case err != nil: + _ = c.leaveGroup() + return nil, err + } + return subs, nil +} + +// Performs the subscription, part of the mainLoop() +func (c *Consumer) subscribe(tomb *loopTomb, subs map[string][]int32) error { + // fetch offsets + offsets, err := c.fetchOffsets(subs) + if err != nil { + _ = c.leaveGroup() + return err + } + + // create consumers in parallel + var mu sync.Mutex + var wg sync.WaitGroup + + for topic, partitions := range subs { + for _, partition := range partitions { + wg.Add(1) + + info := offsets[topic][partition] + go func(topic string, partition int32) { + if e := c.createConsumer(tomb, topic, partition, info); e != nil { + mu.Lock() + err = e + mu.Unlock() + } + wg.Done() + }(topic, partition) + } + } + wg.Wait() + + if err != nil { + _ = c.release() + _ = c.leaveGroup() + } + return err +} + +// -------------------------------------------------------------------- + +// Send a request to the broker to join group on rebalance() +func (c *Consumer) joinGroup() (*balancer, error) { + memberID, _ := c.membership() + req := &sarama.JoinGroupRequest{ + GroupId: c.groupID, + MemberId: memberID, + SessionTimeout: int32(c.client.config.Group.Session.Timeout / time.Millisecond), + ProtocolType: "consumer", + } + + meta := &sarama.ConsumerGroupMemberMetadata{ + Version: 1, + Topics: append(c.coreTopics, c.extraTopics...), + UserData: c.client.config.Group.Member.UserData, + } + err := req.AddGroupProtocolMetadata(string(StrategyRange), meta) + if err != nil { + return nil, err + } + err = req.AddGroupProtocolMetadata(string(StrategyRoundRobin), meta) + if err != nil { + return nil, err + } + + broker, err := c.client.Coordinator(c.groupID) + if err != nil { + c.closeCoordinator(broker, err) + return nil, err + } + + resp, err := broker.JoinGroup(req) + if err != nil { + c.closeCoordinator(broker, err) + return nil, err + } else if resp.Err != sarama.ErrNoError { + c.closeCoordinator(broker, resp.Err) + return nil, resp.Err + } + + var strategy *balancer + if resp.LeaderId == resp.MemberId { + members, err := resp.GetMembers() + if err != nil { + return nil, err + } + + strategy, err = newBalancerFromMeta(c.client, members) + if err != nil { + return nil, err + } + } + + c.membershipMu.Lock() + c.memberID = resp.MemberId + c.generationID = resp.GenerationId + c.membershipMu.Unlock() + + return strategy, nil +} + +// Send a request to the broker to sync the group on rebalance(). +// Returns a list of topics and partitions to consume. +func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) { + memberID, generationID := c.membership() + req := &sarama.SyncGroupRequest{ + GroupId: c.groupID, + MemberId: memberID, + GenerationId: generationID, + } + + if strategy != nil { + for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) { + if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{ + Topics: topics, + }); err != nil { + return nil, err + } + } + } + + broker, err := c.client.Coordinator(c.groupID) + if err != nil { + c.closeCoordinator(broker, err) + return nil, err + } + + resp, err := broker.SyncGroup(req) + if err != nil { + c.closeCoordinator(broker, err) + return nil, err + } else if resp.Err != sarama.ErrNoError { + c.closeCoordinator(broker, resp.Err) + return nil, resp.Err + } + + // Return if there is nothing to subscribe to + if len(resp.MemberAssignment) == 0 { + return nil, nil + } + + // Get assigned subscriptions + members, err := resp.GetMemberAssignment() + if err != nil { + return nil, err + } + + // Sort partitions, for each topic + for topic := range members.Topics { + sort.Sort(int32Slice(members.Topics[topic])) + } + return members.Topics, nil +} + +// Fetches latest committed offsets for all subscriptions +func (c *Consumer) fetchOffsets(subs map[string][]int32) (map[string]map[int32]offsetInfo, error) { + offsets := make(map[string]map[int32]offsetInfo, len(subs)) + req := &sarama.OffsetFetchRequest{ + Version: 1, + ConsumerGroup: c.groupID, + } + + for topic, partitions := range subs { + offsets[topic] = make(map[int32]offsetInfo, len(partitions)) + for _, partition := range partitions { + offsets[topic][partition] = offsetInfo{Offset: -1} + req.AddPartition(topic, partition) + } + } + + broker, err := c.client.Coordinator(c.groupID) + if err != nil { + c.closeCoordinator(broker, err) + return nil, err + } + + resp, err := broker.FetchOffset(req) + if err != nil { + c.closeCoordinator(broker, err) + return nil, err + } + + for topic, partitions := range subs { + for _, partition := range partitions { + block := resp.GetBlock(topic, partition) + if block == nil { + return nil, sarama.ErrIncompleteResponse + } + + if block.Err == sarama.ErrNoError { + offsets[topic][partition] = offsetInfo{Offset: block.Offset, Metadata: block.Metadata} + } else { + return nil, block.Err + } + } + } + return offsets, nil +} + +// Send a request to the broker to leave the group on failes rebalance() and on Close() +func (c *Consumer) leaveGroup() error { + broker, err := c.client.Coordinator(c.groupID) + if err != nil { + c.closeCoordinator(broker, err) + return err + } + + memberID, _ := c.membership() + if _, err = broker.LeaveGroup(&sarama.LeaveGroupRequest{ + GroupId: c.groupID, + MemberId: memberID, + }); err != nil { + c.closeCoordinator(broker, err) + } + return err +} + +// -------------------------------------------------------------------- + +func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32, info offsetInfo) error { + memberID, _ := c.membership() + sarama.Logger.Printf("cluster/consumer %s consume %s/%d from %d\n", memberID, topic, partition, info.NextOffset(c.client.config.Consumer.Offsets.Initial)) + + // Create partitionConsumer + pc, err := newPartitionConsumer(c.consumer, topic, partition, info, c.client.config.Consumer.Offsets.Initial) + if err != nil { + return err + } + + // Store in subscriptions + c.subs.Store(topic, partition, pc) + + // Start partition consumer goroutine + tomb.Go(func(stopper <-chan none) { + if c.client.config.Group.Mode == ConsumerModePartitions { + pc.waitFor(stopper, c.errors) + } else { + pc.multiplex(stopper, c.messages, c.errors) + } + }) + + if c.client.config.Group.Mode == ConsumerModePartitions { + c.partitions <- pc + } + return nil +} + +func (c *Consumer) commitOffsetsWithRetry(retries int) error { + err := c.CommitOffsets() + if err != nil && retries > 0 { + return c.commitOffsetsWithRetry(retries - 1) + } + return err +} + +func (c *Consumer) closeCoordinator(broker *sarama.Broker, err error) { + if broker != nil { + _ = broker.Close() + } + + switch err { + case sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrNotCoordinatorForConsumer: + _ = c.client.RefreshCoordinator(c.groupID) + } +} + +func (c *Consumer) selectExtraTopics(allTopics []string) []string { + extra := allTopics[:0] + for _, topic := range allTopics { + if !c.isKnownCoreTopic(topic) && c.isPotentialExtraTopic(topic) { + extra = append(extra, topic) + } + } + return extra +} + +func (c *Consumer) isKnownCoreTopic(topic string) bool { + pos := sort.SearchStrings(c.coreTopics, topic) + return pos < len(c.coreTopics) && c.coreTopics[pos] == topic +} + +func (c *Consumer) isKnownExtraTopic(topic string) bool { + pos := sort.SearchStrings(c.extraTopics, topic) + return pos < len(c.extraTopics) && c.extraTopics[pos] == topic +} + +func (c *Consumer) isPotentialExtraTopic(topic string) bool { + rx := c.client.config.Group.Topics + if rx.Blacklist != nil && rx.Blacklist.MatchString(topic) { + return false + } + if rx.Whitelist != nil && rx.Whitelist.MatchString(topic) { + return true + } + return false +} + +func (c *Consumer) refreshCoordinator() error { + if err := c.refreshMetadata(); err != nil { + return err + } + return c.client.RefreshCoordinator(c.groupID) +} + +func (c *Consumer) refreshMetadata() (err error) { + if c.client.config.Metadata.Full { + err = c.client.RefreshMetadata() + } else { + var topics []string + if topics, err = c.client.Topics(); err == nil && len(topics) != 0 { + err = c.client.RefreshMetadata(topics...) + } + } + + // maybe we didn't have authorization to describe all topics + switch err { + case sarama.ErrTopicAuthorizationFailed: + err = c.client.RefreshMetadata(c.coreTopics...) + } + return +} + +func (c *Consumer) membership() (memberID string, generationID int32) { + c.membershipMu.RLock() + memberID, generationID = c.memberID, c.generationID + c.membershipMu.RUnlock() + return +} diff --git a/vendor/github.com/bsm/sarama-cluster/doc.go b/vendor/github.com/bsm/sarama-cluster/doc.go new file mode 100644 index 00000000000..9c8ff16a77e --- /dev/null +++ b/vendor/github.com/bsm/sarama-cluster/doc.go @@ -0,0 +1,8 @@ +/* +Package cluster provides cluster extensions for Sarama, enabing users +to consume topics across from multiple, balanced nodes. + +It requires Kafka v0.9+ and follows the steps guide, described in: +https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design +*/ +package cluster diff --git a/vendor/github.com/bsm/sarama-cluster/offsets.go b/vendor/github.com/bsm/sarama-cluster/offsets.go new file mode 100644 index 00000000000..4223ac5e012 --- /dev/null +++ b/vendor/github.com/bsm/sarama-cluster/offsets.go @@ -0,0 +1,69 @@ +package cluster + +import ( + "sync" + + "github.com/Shopify/sarama" +) + +// OffsetStash allows to accumulate offsets and +// mark them as processed in a bulk +type OffsetStash struct { + offsets map[topicPartition]offsetInfo + mu sync.Mutex +} + +// NewOffsetStash inits a blank stash +func NewOffsetStash() *OffsetStash { + return &OffsetStash{offsets: make(map[topicPartition]offsetInfo)} +} + +// MarkOffset stashes the provided message offset +func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string) { + s.MarkPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata) +} + +// MarkPartitionOffset stashes the offset for the provided topic/partition combination +func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) { + s.mu.Lock() + defer s.mu.Unlock() + + key := topicPartition{Topic: topic, Partition: partition} + if info := s.offsets[key]; offset >= info.Offset { + info.Offset = offset + info.Metadata = metadata + s.offsets[key] = info + } +} + +// ResetPartitionOffset stashes the offset for the provided topic/partition combination. +// Difference between ResetPartitionOffset and MarkPartitionOffset is that, ResetPartitionOffset supports earlier offsets +func (s *OffsetStash) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) { + s.mu.Lock() + defer s.mu.Unlock() + + key := topicPartition{Topic: topic, Partition: partition} + if info := s.offsets[key]; offset <= info.Offset { + info.Offset = offset + info.Metadata = metadata + s.offsets[key] = info + } +} + +// ResetOffset stashes the provided message offset +// See ResetPartitionOffset for explanation +func (s *OffsetStash) ResetOffset(msg *sarama.ConsumerMessage, metadata string) { + s.ResetPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata) +} + +// Offsets returns the latest stashed offsets by topic-partition +func (s *OffsetStash) Offsets() map[string]int64 { + s.mu.Lock() + defer s.mu.Unlock() + + res := make(map[string]int64, len(s.offsets)) + for tp, info := range s.offsets { + res[tp.String()] = info.Offset + } + return res +} diff --git a/vendor/github.com/bsm/sarama-cluster/partitions.go b/vendor/github.com/bsm/sarama-cluster/partitions.go new file mode 100644 index 00000000000..bfaa587830d --- /dev/null +++ b/vendor/github.com/bsm/sarama-cluster/partitions.go @@ -0,0 +1,290 @@ +package cluster + +import ( + "sort" + "sync" + "time" + + "github.com/Shopify/sarama" +) + +// PartitionConsumer allows code to consume individual partitions from the cluster. +// +// See docs for Consumer.Partitions() for more on how to implement this. +type PartitionConsumer interface { + sarama.PartitionConsumer + + // Topic returns the consumed topic name + Topic() string + + // Partition returns the consumed partition + Partition() int32 + + // InitialOffset returns the offset used for creating the PartitionConsumer instance. + // The returned offset can be a literal offset, or OffsetNewest, or OffsetOldest + InitialOffset() int64 + + // MarkOffset marks the offset of a message as preocessed. + MarkOffset(offset int64, metadata string) + + // ResetOffset resets the offset to a previously processed message. + ResetOffset(offset int64, metadata string) +} + +type partitionConsumer struct { + sarama.PartitionConsumer + + state partitionState + mu sync.Mutex + + topic string + partition int32 + initialOffset int64 + + closeOnce sync.Once + closeErr error + + dying, dead chan none +} + +func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32, info offsetInfo, defaultOffset int64) (*partitionConsumer, error) { + offset := info.NextOffset(defaultOffset) + pcm, err := manager.ConsumePartition(topic, partition, offset) + + // Resume from default offset, if requested offset is out-of-range + if err == sarama.ErrOffsetOutOfRange { + info.Offset = -1 + offset = defaultOffset + pcm, err = manager.ConsumePartition(topic, partition, offset) + } + if err != nil { + return nil, err + } + + return &partitionConsumer{ + PartitionConsumer: pcm, + state: partitionState{Info: info}, + + topic: topic, + partition: partition, + initialOffset: offset, + + dying: make(chan none), + dead: make(chan none), + }, nil +} + +// Topic implements PartitionConsumer +func (c *partitionConsumer) Topic() string { return c.topic } + +// Partition implements PartitionConsumer +func (c *partitionConsumer) Partition() int32 { return c.partition } + +// InitialOffset implements PartitionConsumer +func (c *partitionConsumer) InitialOffset() int64 { return c.initialOffset } + +// AsyncClose implements PartitionConsumer +func (c *partitionConsumer) AsyncClose() { + c.closeOnce.Do(func() { + c.closeErr = c.PartitionConsumer.Close() + close(c.dying) + }) +} + +// Close implements PartitionConsumer +func (c *partitionConsumer) Close() error { + c.AsyncClose() + <-c.dead + return c.closeErr +} + +func (c *partitionConsumer) waitFor(stopper <-chan none, errors chan<- error) { + defer close(c.dead) + + for { + select { + case err, ok := <-c.Errors(): + if !ok { + return + } + select { + case errors <- err: + case <-stopper: + return + case <-c.dying: + return + } + case <-stopper: + return + case <-c.dying: + return + } + } +} + +func (c *partitionConsumer) multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) { + defer close(c.dead) + + for { + select { + case msg, ok := <-c.Messages(): + if !ok { + return + } + select { + case messages <- msg: + case <-stopper: + return + case <-c.dying: + return + } + case err, ok := <-c.Errors(): + if !ok { + return + } + select { + case errors <- err: + case <-stopper: + return + case <-c.dying: + return + } + case <-stopper: + return + case <-c.dying: + return + } + } +} + +func (c *partitionConsumer) getState() partitionState { + c.mu.Lock() + state := c.state + c.mu.Unlock() + + return state +} + +func (c *partitionConsumer) markCommitted(offset int64) { + c.mu.Lock() + if offset == c.state.Info.Offset { + c.state.Dirty = false + } + c.mu.Unlock() +} + +// MarkOffset implements PartitionConsumer +func (c *partitionConsumer) MarkOffset(offset int64, metadata string) { + c.mu.Lock() + if next := offset + 1; next > c.state.Info.Offset { + c.state.Info.Offset = next + c.state.Info.Metadata = metadata + c.state.Dirty = true + } + c.mu.Unlock() +} + +// ResetOffset implements PartitionConsumer +func (c *partitionConsumer) ResetOffset(offset int64, metadata string) { + c.mu.Lock() + if next := offset + 1; next <= c.state.Info.Offset { + c.state.Info.Offset = next + c.state.Info.Metadata = metadata + c.state.Dirty = true + } + c.mu.Unlock() +} + +// -------------------------------------------------------------------- + +type partitionState struct { + Info offsetInfo + Dirty bool + LastCommit time.Time +} + +// -------------------------------------------------------------------- + +type partitionMap struct { + data map[topicPartition]*partitionConsumer + mu sync.RWMutex +} + +func newPartitionMap() *partitionMap { + return &partitionMap{ + data: make(map[topicPartition]*partitionConsumer), + } +} + +func (m *partitionMap) IsSubscribedTo(topic string) bool { + m.mu.RLock() + defer m.mu.RUnlock() + + for tp := range m.data { + if tp.Topic == topic { + return true + } + } + return false +} + +func (m *partitionMap) Fetch(topic string, partition int32) *partitionConsumer { + m.mu.RLock() + pc, _ := m.data[topicPartition{topic, partition}] + m.mu.RUnlock() + return pc +} + +func (m *partitionMap) Store(topic string, partition int32, pc *partitionConsumer) { + m.mu.Lock() + m.data[topicPartition{topic, partition}] = pc + m.mu.Unlock() +} + +func (m *partitionMap) Snapshot() map[topicPartition]partitionState { + m.mu.RLock() + defer m.mu.RUnlock() + + snap := make(map[topicPartition]partitionState, len(m.data)) + for tp, pc := range m.data { + snap[tp] = pc.getState() + } + return snap +} + +func (m *partitionMap) Stop() { + m.mu.RLock() + defer m.mu.RUnlock() + + var wg sync.WaitGroup + for tp := range m.data { + wg.Add(1) + go func(p *partitionConsumer) { + _ = p.Close() + wg.Done() + }(m.data[tp]) + } + wg.Wait() +} + +func (m *partitionMap) Clear() { + m.mu.Lock() + for tp := range m.data { + delete(m.data, tp) + } + m.mu.Unlock() +} + +func (m *partitionMap) Info() map[string][]int32 { + info := make(map[string][]int32) + m.mu.RLock() + for tp := range m.data { + info[tp.Topic] = append(info[tp.Topic], tp.Partition) + } + m.mu.RUnlock() + + for topic := range info { + sort.Sort(int32Slice(info[topic])) + } + return info +} diff --git a/vendor/github.com/bsm/sarama-cluster/util.go b/vendor/github.com/bsm/sarama-cluster/util.go new file mode 100644 index 00000000000..e7cb5dd1b8f --- /dev/null +++ b/vendor/github.com/bsm/sarama-cluster/util.go @@ -0,0 +1,75 @@ +package cluster + +import ( + "fmt" + "sort" + "sync" +) + +type none struct{} + +type topicPartition struct { + Topic string + Partition int32 +} + +func (tp *topicPartition) String() string { + return fmt.Sprintf("%s-%d", tp.Topic, tp.Partition) +} + +type offsetInfo struct { + Offset int64 + Metadata string +} + +func (i offsetInfo) NextOffset(fallback int64) int64 { + if i.Offset > -1 { + return i.Offset + } + return fallback +} + +type int32Slice []int32 + +func (p int32Slice) Len() int { return len(p) } +func (p int32Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p int32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +func (p int32Slice) Diff(o int32Slice) (res []int32) { + on := len(o) + for _, x := range p { + n := sort.Search(on, func(i int) bool { return o[i] >= x }) + if n < on && o[n] == x { + continue + } + res = append(res, x) + } + return +} + +// -------------------------------------------------------------------- + +type loopTomb struct { + c chan none + o sync.Once + w sync.WaitGroup +} + +func newLoopTomb() *loopTomb { + return &loopTomb{c: make(chan none)} +} + +func (t *loopTomb) stop() { t.o.Do(func() { close(t.c) }) } +func (t *loopTomb) Close() { t.stop(); t.w.Wait() } + +func (t *loopTomb) Dying() <-chan none { return t.c } +func (t *loopTomb) Go(f func(<-chan none)) { + t.w.Add(1) + + go func() { + defer t.stop() + defer t.w.Done() + + f(t.c) + }() +}