diff --git a/pkg/utils/cache/persisted_store.go b/pkg/utils/cache/persisted_store.go deleted file mode 100644 index 10da00e7164..00000000000 --- a/pkg/utils/cache/persisted_store.go +++ /dev/null @@ -1,235 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package cache - -import ( - "context" - "encoding/json" - "sync/atomic" - "time" - - "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - - duckv1 "knative.dev/pkg/apis/duck/v1" - "knative.dev/pkg/logging" -) - -const ( - // ResourcesKey is the configmap key holding resources - ResourcesKey = "resources.json" - - // ComponentLabelKey is the label added to ConfigMaps - ComponentLabelKey = "eventing.knative.dev/component" -) - -// PersistedStore persists cache.Store objects to a single ConfigMap. -type PersistedStore interface { - - // Run starts persisting the cache store to the configmap-backed store - Run(ctx context.Context) -} - -// Projector projects the given object to the persisted object -type Projector func(interface{}) interface{} - -type persistedStore struct { - // Component associated to this store - component string - - // kubeClient is the client use to manage kube objects - kubeClient kubernetes.Interface - - // namespace of the configmap - namespace string - - // name of the configmap - name string - - // store is the in-memory cache - store cache.Store - - // Projector function transforms objects before being persisted - // If the returned object is nil, it is not persisted. - project func(interface{}) interface{} - - // sync channel - syncCh chan bool - - // syncing state (idle: 0, syncing: 1) - syncing int32 -} - -// NewPersistedStore creates a persisted store for the given informer -func NewPersistedStore( - component string, - kubeClient kubernetes.Interface, - namespace string, - name string, - informer cache.SharedInformer, - projector Projector) PersistedStore { - - if projector == nil { - // identity projection - projector = func(v interface{}) interface{} { - return v - } - } - - pstore := &persistedStore{ - component: component, - kubeClient: kubeClient, - namespace: namespace, - name: name, - store: informer.GetStore(), - project: projector, - syncCh: make(chan bool, 1), - syncing: 0, - } - informer.AddEventHandler(pstore) - - return pstore -} - -func (p *persistedStore) Run(ctx context.Context) { - logger := logging.FromContext(ctx) - - // Trigger a sync to make sure the ConfigMap resource exists - p.sync() - - wait.UntilWithContext(ctx, func(ctx context.Context) { - select { - case <-ctx.Done(): - return - case <-p.syncCh: - atomic.StoreInt32(&p.syncing, 1) - if err := p.doSync(ctx.Done()); err != nil { - logger.Warnw("failed to persist resources", zap.Error(err)) - - // Retry - p.sync() - } - atomic.StoreInt32(&p.syncing, 0) - } - }, 10*time.Millisecond) -} - -// Sync triggers the synchronization process. -// If a sync is already taking place, it is interrupted before triggering it. -func (p *persistedStore) sync() { - // Interrupt current sync process (if ongoing) - atomic.StoreInt32(&p.syncing, 0) - - if len(p.syncCh) == 0 { - p.syncCh <- true - } -} - -func (p *persistedStore) doSync(stopCh <-chan struct{}) error { - cm, err := p.load() - if err != nil { - return err - } - - if cm.Data == nil { - cm.Data = make(map[string]string) - } - - // TODO: add support for partitioning - - config := make(map[string]interface{}) - objs := p.store.List() - for _, obj := range objs { - if len(stopCh) > 0 || atomic.LoadInt32(&p.syncing) == 0 { - // either cancelled or interrupted. - return nil - } - - // Only add object that are ready - kr := obj.(duckv1.KRShaped) - if !kr.GetStatus().GetCondition(kr.GetConditionSet().GetTopLevelConditionType()).IsTrue() { - continue - } - - key := kr.GetNamespace() + "/" + kr.GetName() - if projected := p.project(obj); projected != nil { - config[key] = projected - } else { - delete(config, key) - } - } - - bconfig, err := json.Marshal(config) - if err != nil { - return err - } - newconfig := string(bconfig) - - if oldconfig, ok := cm.Data[ResourcesKey]; !ok || oldconfig != newconfig { - cm.Data[ResourcesKey] = newconfig - _, err = p.kubeClient.CoreV1().ConfigMaps(p.namespace).Update(context.Background(), cm, metav1.UpdateOptions{}) - - if err != nil { - return err - } - } - return nil -} - -func (p *persistedStore) load() (*corev1.ConfigMap, error) { - cm, err := p.kubeClient.CoreV1().ConfigMaps(p.namespace).Get(context.Background(), p.name, metav1.GetOptions{}) - if err != nil { - if !errors.IsNotFound(err) { - return nil, err - } - - cm := &corev1.ConfigMap{ - TypeMeta: metav1.TypeMeta{ - APIVersion: "v1", - Kind: "ConfigMap", - }, - ObjectMeta: metav1.ObjectMeta{ - Name: p.name, - Namespace: p.namespace, - Labels: map[string]string{ - ComponentLabelKey: p.component, - }, - }, - Data: map[string]string{}, - } - - return p.kubeClient.CoreV1().ConfigMaps(p.namespace).Create(context.Background(), cm, metav1.CreateOptions{}) - } - - return cm, nil -} - -func (p *persistedStore) OnAdd(interface{}) { - p.sync() -} - -func (p *persistedStore) OnUpdate(interface{}, interface{}) { - p.sync() -} - -func (p *persistedStore) OnDelete(interface{}) { - p.sync() -} diff --git a/pkg/utils/cache/persisted_store_test.go b/pkg/utils/cache/persisted_store_test.go deleted file mode 100644 index 01cae49a6da..00000000000 --- a/pkg/utils/cache/persisted_store_test.go +++ /dev/null @@ -1,290 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package cache - -import ( - "context" - "testing" - "time" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes/fake" - ktesting "k8s.io/client-go/testing" - "k8s.io/client-go/tools/cache" - - "knative.dev/pkg/apis" - duckv1 "knative.dev/pkg/apis/duck/v1" - "knative.dev/pkg/logging" - logtesting "knative.dev/pkg/logging/testing" -) - -type KResourceWithSpec struct { - duckv1.KResource - Spec interface{} -} - -const ( - cmNs = "test-ns" - cmName = "test-name" - timeOutValue = 5 * time.Second -) - -var ( - // Check that Channel can return its spec untyped. - _ apis.HasSpec = (*KResourceWithSpec)(nil) -) - -func (k *KResourceWithSpec) GetUntypedSpec() interface{} { - return k.Spec -} - -func TestPersistedStore(t *testing.T) { - const ( - cmNs = "test-cm-ns" - cmName = "test-cm-name" - ) - cs := fake.NewSimpleClientset() - created := make(chan runtime.Object) - updated := make(chan runtime.Object) - done := make(chan bool) - cs.PrependReactor("create", "configmaps", - func(action ktesting.Action) (bool, runtime.Object, error) { - created <- action.(ktesting.CreateAction).GetObject() - return false, nil, nil - }, - ) - cs.PrependReactor("update", "configmaps", - func(action ktesting.Action) (bool, runtime.Object, error) { - updated <- action.(ktesting.UpdateAction).GetObject() - return false, nil, nil - }, - ) - - store := cache.NewStore(cache.MetaNamespaceKeyFunc) - informer := newKResourceInformer(store) - - logger := logtesting.TestLogger(t) - - pstore := NewPersistedStore("my-component", cs, cmNs, cmName, informer, func(obj interface{}) interface{} { - return obj.(apis.HasSpec).GetUntypedSpec() - }) - ctx := logging.WithLogger(context.Background(), logger) - ctx, cancel := context.WithCancel(ctx) - go func() { - pstore.Run(ctx) - done <- true - }() - - kr := newKResource("test-ns", "test-name") - informer.Add(kr) - - select { - case obj := <-created: - // We expect the configmap to be created. - cm := obj.(*corev1.ConfigMap) - if value, ok := cm.Labels[ComponentLabelKey]; !ok || value != "my-component" { - t.Fatalf("Missing %s label. Got %v", ComponentLabelKey, cm) - } - case <-time.After(timeOutValue): - t.Fatal("Timed out waiting for configmap creation.") - } - - select { - case obj := <-updated: - cm := obj.(*corev1.ConfigMap) - if value, ok := cm.Data["resources.json"]; !ok || value != `{"test-ns/test-name":"aspec"}` { - t.Fatalf("Unexpected ConfigMap. Got %v", cm) - } - case <-time.After(timeOutValue): - t.Fatal("Timed out waiting for configmap update.") - } - - informer.Delete(kr) - - select { - case obj := <-updated: - cm := obj.(*corev1.ConfigMap) - if spec, ok := cm.Data["resources.json"]; !ok || spec != `{}` { - t.Fatalf("Unexpected ConfigMap. Got %v", cm) - } - case <-time.After(timeOutValue): - t.Fatal("Timed out waiting for configmap update.") - } - - cancel() - - select { - case <-done: - case <-time.After(timeOutValue): - t.Fatal("Timed out waiting for persisted store to stop.") - } - -} - -func TestPersistedStoreUnStarted(t *testing.T) { - cs := fake.NewSimpleClientset() - store := cache.NewStore(cache.MetaNamespaceKeyFunc) - informer := newKResourceInformer(store) - pstore := NewPersistedStore("my-component", cs, cmNs, cmName, informer, nil).(*persistedStore) - - done := make(chan bool) - go func() { - pstore.sync() - pstore.sync() - pstore.sync() - done <- true - }() - select { - case <-done: - // All good, none blocking - case <-time.After(timeOutValue): - t.Fatal("Timed out while waiting for multiple sync triggers to terminate.") - } -} - -func TestPersistedStoreInterrupted(t *testing.T) { - const ( - cmNs = "test-cm-ns" - cmName = "test-cm-name" - ) - cs := fake.NewSimpleClientset() - created := make(chan runtime.Object) - updated := make(chan runtime.Object) - cs.PrependReactor("create", "configmaps", - func(action ktesting.Action) (bool, runtime.Object, error) { - time.Sleep(timeOutValue) - created <- action.(ktesting.CreateAction).GetObject() - return false, nil, nil - }, - ) - cs.PrependReactor("update", "configmaps", - func(action ktesting.Action) (bool, runtime.Object, error) { - updated <- action.(ktesting.UpdateAction).GetObject() - return false, nil, nil - }, - ) - - store := cache.NewStore(cache.MetaNamespaceKeyFunc) - informer := newKResourceInformer(store) - logger := logtesting.TestLogger(t) - - pstore := NewPersistedStore("my-component", cs, cmNs, cmName, informer, func(obj interface{}) interface{} { - return obj.(apis.HasSpec).GetUntypedSpec() - }) - - ctx := logging.WithLogger(context.Background(), logger) - go func() { - pstore.Run(ctx) - }() - - kr := newKResource("test-ns", "test-name") - informer.Add(kr) - - time.Sleep(500 * time.Millisecond) - - kr2 := newKResource("test-ns", "test-name-2") - informer.Add(kr2) // interrupt - - select { - case obj := <-created: - // We expect the configmap to be created. - cm := obj.(*corev1.ConfigMap) - if value, ok := cm.Labels[ComponentLabelKey]; !ok || value != "my-component" { - t.Fatalf("Missing %s label. Got %v", ComponentLabelKey, cm) - } - case <-time.After(timeOutValue): - t.Fatal("Timed out waiting for configmap creation.") - } - - select { - case obj := <-updated: - cm := obj.(*corev1.ConfigMap) - if value, ok := cm.Data["resources.json"]; !ok || value != `{"test-ns/test-name":"aspec","test-ns/test-name-2":"aspec"}` { - t.Fatalf("Unexpected ConfigMap. Got %v", cm) - } - case <-time.After(timeOutValue): - t.Fatal("Timed out waiting for configmap update.") - } -} - -func newKResource(ns, name string) *KResourceWithSpec { - return &KResourceWithSpec{ - KResource: duckv1.KResource{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: ns, - Name: name, - }, - Status: duckv1.Status{ - Conditions: duckv1.Conditions{ - { - Type: apis.ConditionReady, - Status: corev1.ConditionTrue, - }, - }, - }, - }, - Spec: "aspec", - } -} - -type kresourceInformer struct { - store cache.Store - handler cache.ResourceEventHandler -} - -func newKResourceInformer(store cache.Store) *kresourceInformer { - return &kresourceInformer{ - store: store, - } -} - -func (k *kresourceInformer) Add(obj interface{}) { - k.store.Add(obj) - k.handler.OnAdd(obj) -} - -func (k *kresourceInformer) Delete(obj interface{}) { - k.store.Delete(obj) - k.handler.OnDelete(obj) -} - -func (k *kresourceInformer) AddEventHandler(handler cache.ResourceEventHandler) { - k.handler = handler -} - -func (k *kresourceInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) { -} - -func (k *kresourceInformer) GetStore() cache.Store { - return k.store -} - -func (k *kresourceInformer) GetController() cache.Controller { - return nil -} - -func (k *kresourceInformer) Run(stopCh <-chan struct{}) { -} - -func (k *kresourceInformer) HasSynced() bool { - return true -} - -func (k *kresourceInformer) LastSyncResourceVersion() string { - return "" -}