Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,18 @@ import (

"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"go.opentelemetry.io/otel"
)

func waitOnEBPFEngineProcessErrorCode(cacheAccumulatorErrorChan chan error) {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "EBPF engine process error")
defer span.End()
err := <-cacheAccumulatorErrorChan
if err != nil {
logger.L().Ctx(ctx).Fatal("error", helpers.Error(err))
}
}

func main() {
cfg := config.GetConfigurationConfigContext()
configData, err := cfg.GetConfigurationReader()
Expand All @@ -35,6 +45,9 @@ func main() {
if err != nil {
logger.L().Ctx(context.GetBackgroundContext()).Fatal("error during start accumulator", helpers.Error(err))
}
go func() {
waitOnEBPFEngineProcessErrorCode(accumulatorChannelError)
}()

k8sAPIServerClient, err := conthandler.CreateContainerClientK8SAPIServer()
if err != nil {
Expand Down
15 changes: 1 addition & 14 deletions pkg/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"sniffer/pkg/config"

"github.com/kubescape/go-logger"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

const (
Expand All @@ -17,7 +15,6 @@ const (

type BackgroundContext struct {
ctx context.Context
span trace.Span
}

var backgroundContext BackgroundContext
Expand All @@ -34,19 +31,9 @@ func SetBackgroundContext() {
config.GetConfigurationConfigContext().GetAccountID(),
config.GetConfigurationConfigContext().GetClusterName(),
url.URL{Host: config.GetConfigurationConfigContext().GetBackgroundContextURL()})
setMainSpan(ctx)
}

func setMainSpan(context context.Context) {
ctx, span := otel.Tracer("").Start(context, "mainSpan")
backgroundContext.ctx = ctx
backgroundContext.span = span
}

func GetBackgroundContext() context.Context {
return backgroundContext.ctx
}

func GetMainSpan() trace.Span {
return backgroundContext.span
}
}
39 changes: 20 additions & 19 deletions pkg/conthandler/container_main_handler.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package conthandler

import (
gcontext "context"
"errors"
"fmt"
"sniffer/pkg/config"
Expand Down Expand Up @@ -78,11 +77,11 @@ func (ch *ContainerHandler) afterTimerActions() error {
var err error

for {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "afterTimerActions")
afterTimerActionsData := <-ch.afterTimerActionsChannel
containerDataInterface, exist := ch.watchedContainers.Load(afterTimerActionsData.containerID)
if !exist {
logger.L().Ctx(context.GetBackgroundContext()).Warning("afterTimerActions: failed to get container data of container ID", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID)}...)
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "afterTimerActions")
logger.L().Ctx(ctx).Warning("afterTimerActions: failed to get container data of container ID", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID)}...)
span.End()
continue
}
Expand All @@ -93,23 +92,23 @@ func (ch *ContainerHandler) afterTimerActions() error {

if err = <-containerData.syncChannel[StepGetSBOM]; err != nil {
logger.L().Debug("failed to get SBOM", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("container name", containerData.event.GetContainerName()), helpers.String("k8s resource ", containerData.event.GetK8SWorkloadID()), helpers.Error(err)}...)
span.End()
continue
}
if err = containerData.sbomClient.FilterSBOM(fileList); err != nil {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "afterTimerActions")
logger.L().Ctx(ctx).Warning("failed to filter SBOM", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("container name", containerData.event.GetContainerName()), helpers.String("k8s resource", containerData.event.GetK8SWorkloadID()), helpers.Error(err)}...)
span.End()
continue
}
if err = containerData.sbomClient.StoreFilterSBOM(containerData.event.GetInstanceIDHash()); err != nil {
if !errors.Is(err, sbom.IsAlreadyExist()) {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "afterTimerActions")
logger.L().Ctx(ctx).Error("failed to store filtered SBOM", []helpers.IDetails{helpers.String("container ID", afterTimerActionsData.containerID), helpers.String("k8s resource", containerData.event.GetK8SWorkloadID()), helpers.Error(err)}...)
span.End()
}
span.End()
continue
}
logger.L().Info("filtered SBOM has been stored successfully", []helpers.IDetails{helpers.String("containerID", afterTimerActionsData.containerID), helpers.String("k8s resource", containerData.event.GetK8SWorkloadID())}...)
span.End() // defer in infinite loop never runs
}
}
}
Expand All @@ -129,6 +128,7 @@ func (ch *ContainerHandler) startTimer(watchedContainer watchedContainerData, co
err = droppedEventsError
} else if errors.Is(err, containerHasTerminatedError) {
watchedContainer.snifferTicker.Stop()
err = containerHasTerminatedError
}
}

Expand All @@ -146,17 +146,16 @@ func (ch *ContainerHandler) deleteResources(watchedContainer watchedContainerDat
ch.watchedContainers.Delete(contEvent.GetContainerID())
}

func (ch *ContainerHandler) startRelevancyProcess(contEvent v1.ContainerEventData, ctx gcontext.Context) {
func (ch *ContainerHandler) startRelevancyProcess(contEvent v1.ContainerEventData) {
containerDataInterface, exist := ch.watchedContainers.Load(contEvent.GetContainerID())
if !exist {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "container monitoring", trace.WithAttributes(attribute.String("containerID", contEvent.GetContainerID()), attribute.String("container workload", contEvent.GetK8SWorkloadID())))
defer span.End()
logger.L().Ctx(ctx).Error("startRelevancyProcess: failed to get container data", helpers.String("container ID", contEvent.GetContainerID()), helpers.String("container name", contEvent.GetContainerName()), helpers.String("k8s resources", contEvent.GetK8SWorkloadID()))
return
}
watchedContainer := containerDataInterface.(watchedContainerData)

ctx, span := otel.Tracer("").Start(ctx, "container monitoring", trace.WithAttributes(attribute.String("containerID", contEvent.GetContainerID()), attribute.String("container workload", contEvent.GetK8SWorkloadID())))
defer span.End()

err := watchedContainer.containerAggregator.StartAggregate(watchedContainer.syncChannel[StepEventAggregator])
if err != nil {
return
Expand All @@ -167,14 +166,16 @@ func (ch *ContainerHandler) startRelevancyProcess(contEvent v1.ContainerEventDat
stopSniffingTime := now.Add(configStopTime)
for start := time.Now(); start.Before(stopSniffingTime); {
go ch.getSBOM(contEvent)
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "container monitoring", trace.WithAttributes(attribute.String("containerID", contEvent.GetContainerID()), attribute.String("container workload", contEvent.GetK8SWorkloadID())))
err = ch.startTimer(watchedContainer, contEvent.GetContainerID())
if err != nil {
if errors.Is(err, droppedEventsError) {
logger.L().Ctx(ctx).Warning("container monitoring got drop events - we may miss some realtime data", helpers.String("container ID", contEvent.GetContainerID()), helpers.String("container name", contEvent.GetContainerName()), helpers.String("k8s resources", contEvent.GetK8SWorkloadID()), helpers.Error(err))
} else if errors.Is(err, containerHasTerminatedError) {
break
}
} else if errors.Is(err, containerHasTerminatedError) {
break
}
span.End()
}
logger.L().Info("stop monitor on container - after monitoring time", helpers.String("container ID", contEvent.GetContainerID()), helpers.String("container name", contEvent.GetContainerName()), helpers.String("k8s resources", contEvent.GetK8SWorkloadID()), helpers.Error(err))
ch.deleteResources(watchedContainer, contEvent)
Expand All @@ -199,7 +200,7 @@ func (ch *ContainerHandler) getSBOM(contEvent v1.ContainerEventData) {
watchedContainer.syncChannel[StepGetSBOM] <- err
}

func (ch *ContainerHandler) handleContainerRunningEvent(contEvent v1.ContainerEventData, ctx gcontext.Context) error {
func (ch *ContainerHandler) handleContainerRunningEvent(contEvent v1.ContainerEventData) error {
_, exist := ch.watchedContainers.Load(contEvent.GetContainerID())
if exist {
return containerAlreadyExistError
Expand All @@ -216,7 +217,7 @@ func (ch *ContainerHandler) handleContainerRunningEvent(contEvent v1.ContainerEv
},
}
ch.watchedContainers.Store(contEvent.GetContainerID(), newWatchedContainer)
go ch.startRelevancyProcess(contEvent, ctx)
go ch.startRelevancyProcess(contEvent)
return nil
}

Expand All @@ -232,10 +233,10 @@ func (ch *ContainerHandler) handleContainerTerminatedEvent(contEvent v1.Containe
return nil
}

func (ch *ContainerHandler) handleNewContainerEvent(contEvent v1.ContainerEventData, ctx gcontext.Context) error {
func (ch *ContainerHandler) handleNewContainerEvent(contEvent v1.ContainerEventData) error {
switch contEvent.GetContainerEventType() {
case v1.ContainerRunning:
return ch.handleContainerRunningEvent(contEvent, ctx)
return ch.handleContainerRunningEvent(contEvent)

case v1.ContainerDeleted:
return ch.handleContainerTerminatedEvent(contEvent)
Expand All @@ -252,14 +253,14 @@ func (ch *ContainerHandler) StartMainHandler() error {
}()

for {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "mainContainerHandler")
contEvent := <-ch.containersEventChan
err := ch.handleNewContainerEvent(contEvent, ctx)
err := ch.handleNewContainerEvent(contEvent)
if err != nil {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "mainContainerHandler")
if !errors.Is(err, containerAlreadyExistError) {
logger.L().Ctx(ctx).Warning("fail to handle new container", helpers.String("ContainerID", contEvent.GetContainerID()), helpers.String("Container name", contEvent.GetContainerID()), helpers.String("k8s workload", contEvent.GetK8SWorkloadID()), helpers.Error(err))
}
span.End()
}
span.End()
}
}
3 changes: 1 addition & 2 deletions pkg/conthandler/container_main_handler_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package conthandler

import (
"context"
"path"
"sniffer/pkg/config"
configV1 "sniffer/pkg/config/v1"
Expand Down Expand Up @@ -65,7 +64,7 @@ func TestContMainHandler(t *testing.T) {
t.Fatalf("container ID is wrong, get: %s expected: %s", event.GetContainerID(), RedisContainerIDContHandler)
}
time.Sleep(12 * time.Second)
err = contHandler.handleNewContainerEvent(event, context.Background())
err = contHandler.handleNewContainerEvent(event)
if err != nil {
t.Fatalf("handleNewContainerEvent failed with error %v", err)
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/event_data_storage/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ func (acc *Accumulator) streamEventToRegisterContainer(event *evData.EventData)

func (acc *Accumulator) accumulateEbpfEngineData() {
for {
ctx, span := otel.Tracer("").Start(context.GetBackgroundContext(), "accumulator.accumulateEbpfEngineData")
event := <-acc.eventChannel
if nodeAgentContainerID != "" && strings.Contains(event.GetEventContainerID(), nodeAgentContainerID) {
continue
Expand All @@ -180,8 +179,8 @@ func (acc *Accumulator) accumulateEbpfEngineData() {
} else {
index, newSlotIsNeeded, err := acc.findIndexByTimestamp(event)
if err != nil {
logger.L().Ctx(ctx).Warning("findIndexByTimestamp fail to find the index to insert the event", helpers.Error(err))
logger.L().Ctx(ctx).Warning("event that didn't store", helpers.String("event", fmt.Sprintf("%v", event)))
logger.L().Ctx(context.GetBackgroundContext()).Warning("findIndexByTimestamp fail to find the index to insert the event", helpers.Error(err))
logger.L().Ctx(context.GetBackgroundContext()).Warning("event that didn't store", helpers.String("event", fmt.Sprintf("%v", event)))
continue
}
if newSlotIsNeeded {
Expand All @@ -191,7 +190,6 @@ func (acc *Accumulator) accumulateEbpfEngineData() {
acc.streamEventToRegisterContainer(event)
}
}
span.End()
}
}

Expand Down