Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/twelve-teachers-sleep.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/protocol": patch
---

update agent reporter for cloud agents
46 changes: 23 additions & 23 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,41 @@ require (
github.com/fsnotify/fsnotify v1.9.0
github.com/gammazero/deque v1.0.0
github.com/go-jose/go-jose/v3 v3.0.4
github.com/go-logr/logr v1.4.2
github.com/go-logr/logr v1.4.3
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/jxskiss/base62 v1.1.0
github.com/lithammer/shortuuid/v4 v4.2.0
github.com/livekit/mageutil v0.0.0-20250511045019-0f1ff63f7731
github.com/livekit/psrpc v0.6.1-0.20250511053145-465289d72c3c
github.com/mackerelio/go-osstat v0.2.5
github.com/maxbrunsfeld/counterfeiter/v6 v6.11.1
github.com/pion/logging v0.2.3
github.com/pion/sdp/v3 v3.0.11
github.com/pion/webrtc/v4 v4.1.0
github.com/pion/logging v0.2.4
github.com/pion/sdp/v3 v3.0.14
github.com/pion/webrtc/v4 v4.1.2
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.22.0
github.com/prometheus/procfs v0.16.1
github.com/puzpuzpuz/xsync/v3 v3.5.1
github.com/redis/go-redis/v9 v9.8.0
github.com/redis/go-redis/v9 v9.11.0
github.com/stretchr/testify v1.10.0
github.com/twitchtv/twirp v8.1.3+incompatible
github.com/zeebo/xxh3 v1.0.2
go.uber.org/atomic v1.11.0
go.uber.org/multierr v1.11.0
go.uber.org/zap v1.27.0
go.uber.org/zap/exp v0.3.0
golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6
golang.org/x/mod v0.24.0
golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b
golang.org/x/mod v0.25.0
golang.org/x/sys v0.33.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20250512202823-5a2f75b736a9
google.golang.org/grpc v1.72.1
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822
google.golang.org/grpc v1.73.0
google.golang.org/protobuf v1.36.6
gopkg.in/yaml.v3 v3.0.1
)

require (
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.6-20250425153114-8976f5be98c1.1 // indirect
buf.build/go/protovalidate v0.12.0 // indirect
buf.build/gen/go/bufbuild/protovalidate/protocolbuffers/go v1.36.6-20250625184727-c923a0c2a132.1 // indirect
buf.build/go/protovalidate v0.13.1 // indirect
cel.dev/expr v0.24.0 // indirect
github.com/antlr4-go/antlr/v4 v4.13.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
Expand All @@ -55,34 +55,34 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/klauspost/compress v1.18.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
github.com/klauspost/cpuid/v2 v2.2.11 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/nats.go v1.42.0 // indirect
github.com/nats-io/nats.go v1.43.0 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pion/datachannel v1.5.10 // indirect
github.com/pion/dtls/v3 v3.0.6 // indirect
github.com/pion/ice/v4 v4.0.10 // indirect
github.com/pion/interceptor v0.1.37 // indirect
github.com/pion/interceptor v0.1.40 // indirect
github.com/pion/mdns/v2 v2.0.7 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.15 // indirect
github.com/pion/rtp v1.8.15 // indirect
github.com/pion/rtp v1.8.19 // indirect
github.com/pion/sctp v1.8.39 // indirect
github.com/pion/srtp/v3 v3.0.4 // indirect
github.com/pion/srtp/v3 v3.0.6 // indirect
github.com/pion/stun/v3 v3.0.0 // indirect
github.com/pion/transport/v3 v3.0.7 // indirect
github.com/pion/turn/v4 v4.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.64.0 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/stoewer/go-strcase v1.3.1 // indirect
github.com/wlynxg/anet v0.0.5 // indirect
golang.org/x/crypto v0.38.0 // indirect
golang.org/x/net v0.40.0 // indirect
golang.org/x/sync v0.14.0 // indirect
golang.org/x/text v0.25.0 // indirect
golang.org/x/tools v0.33.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250512202823-5a2f75b736a9 // indirect
golang.org/x/crypto v0.39.0 // indirect
golang.org/x/net v0.41.0 // indirect
golang.org/x/sync v0.15.0 // indirect
golang.org/x/text v0.26.0 // indirect
golang.org/x/tools v0.34.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect
)
112 changes: 56 additions & 56 deletions go.sum

Large diffs are not rendered by default.

13 changes: 11 additions & 2 deletions livekit/livekit_agent.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 30 additions & 18 deletions observability/agentsobs/gen_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"
)

const Version_9IAJBGO = true
const Version_6JD79I0 = true

type KeyResolver interface {
Resolve(string)
Expand All @@ -22,11 +22,21 @@ type ProjectReporter interface {
RegisterFunc(func(ts time.Time, tx ProjectTx) bool)
Tx(func(tx ProjectTx))
TxAt(time.Time, func(tx ProjectTx))
WithCloudAgent(id string) CloudAgentReporter
WithDeferredCloudAgent() (CloudAgentReporter, KeyResolver)
}

type ProjectTx interface{}

type CloudAgentReporter interface {
RegisterFunc(func(ts time.Time, tx CloudAgentTx) bool)
Tx(func(tx CloudAgentTx))
TxAt(time.Time, func(tx CloudAgentTx))
WithAgent(name string) AgentReporter
WithDeferredAgent() (AgentReporter, KeyResolver)
}

type ProjectTx interface{}
type CloudAgentTx interface{}

type AgentReporter interface {
RegisterFunc(func(ts time.Time, tx AgentTx) bool)
Expand All @@ -44,35 +54,35 @@ type WorkerReporter interface {
TxAt(time.Time, func(tx WorkerTx))
WithJob(id string) JobReporter
WithDeferredJob() (JobReporter, KeyResolver)
ReportLoad(v float32)
ReportStatus(v WorkerStatus)
ReportStartTime(v time.Time)
ReportJobsCurrent(v uint32)
ReportKind(v AgentKind)
ReportCPU(v int64)
ReportCPULimit(v int64)
ReportMem(v int64)
ReportMemLimit(v int64)
ReportLoad(v float32)
ReportStatus(v WorkerStatus)
ReportRegion(v string)
ReportVersion(v string)
ReportSdkVersion(v string)
ReportState(v WorkerState)
ReportStartedAt(v time.Time)
ReportJobsCurrent(v uint16)
ReportKind(v AgentKind)
}

type WorkerTx interface {
ReportLoad(v float32)
ReportStatus(v WorkerStatus)
ReportStartTime(v time.Time)
ReportJobsCurrent(v uint32)
ReportKind(v AgentKind)
ReportCPU(v int64)
ReportCPULimit(v int64)
ReportMem(v int64)
ReportMemLimit(v int64)
ReportLoad(v float32)
ReportStatus(v WorkerStatus)
ReportRegion(v string)
ReportVersion(v string)
ReportSdkVersion(v string)
ReportState(v WorkerState)
ReportStartedAt(v time.Time)
ReportJobsCurrent(v uint16)
ReportKind(v AgentKind)
}

type JobReporter interface {
Expand All @@ -83,17 +93,19 @@ type JobReporter interface {
ReportKind(v JobKind)
ReportStatus(v JobStatus)
ReportDuration(v uint32)
ReportDispatchedAt(v time.Time)
ReportJoinedAt(v time.Time)
ReportCompletedAt(v time.Time)
ReportDurationMinutes(v uint8)
ReportStartTime(v time.Time)
ReportEndTime(v time.Time)
ReportJoinLatency(v uint32)
}

type JobTx interface {
ReportRoomSessionID(v string)
ReportKind(v JobKind)
ReportStatus(v JobStatus)
ReportDuration(v uint32)
ReportDispatchedAt(v time.Time)
ReportJoinedAt(v time.Time)
ReportCompletedAt(v time.Time)
ReportDurationMinutes(v uint8)
ReportStartTime(v time.Time)
ReportEndTime(v time.Time)
ReportJoinLatency(v uint32)
}
48 changes: 33 additions & 15 deletions observability/agentsobs/gen_reporter_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
)

var (
_ Reporter = (*noopReporter)(nil)
_ ProjectReporter = (*noopProjectReporter)(nil)
_ AgentReporter = (*noopAgentReporter)(nil)
_ WorkerReporter = (*noopWorkerReporter)(nil)
_ JobReporter = (*noopJobReporter)(nil)
_ Reporter = (*noopReporter)(nil)
_ ProjectReporter = (*noopProjectReporter)(nil)
_ CloudAgentReporter = (*noopCloudAgentReporter)(nil)
_ AgentReporter = (*noopAgentReporter)(nil)
_ WorkerReporter = (*noopWorkerReporter)(nil)
_ JobReporter = (*noopJobReporter)(nil)
)

type noopKeyResolver struct{}
Expand Down Expand Up @@ -42,10 +43,26 @@ func NewNoopProjectReporter() ProjectReporter {
func (r *noopProjectReporter) RegisterFunc(f func(ts time.Time, tx ProjectTx) bool) {}
func (r *noopProjectReporter) Tx(f func(ProjectTx)) {}
func (r *noopProjectReporter) TxAt(ts time.Time, f func(ProjectTx)) {}
func (r *noopProjectReporter) WithAgent(name string) AgentReporter {
func (r *noopProjectReporter) WithCloudAgent(id string) CloudAgentReporter {
return &noopCloudAgentReporter{}
}
func (r *noopProjectReporter) WithDeferredCloudAgent() (CloudAgentReporter, KeyResolver) {
return &noopCloudAgentReporter{}, noopKeyResolver{}
}

type noopCloudAgentReporter struct{}

func NewNoopCloudAgentReporter() CloudAgentReporter {
return &noopCloudAgentReporter{}
}

func (r *noopCloudAgentReporter) RegisterFunc(f func(ts time.Time, tx CloudAgentTx) bool) {}
func (r *noopCloudAgentReporter) Tx(f func(CloudAgentTx)) {}
func (r *noopCloudAgentReporter) TxAt(ts time.Time, f func(CloudAgentTx)) {}
func (r *noopCloudAgentReporter) WithAgent(name string) AgentReporter {
return &noopAgentReporter{}
}
func (r *noopProjectReporter) WithDeferredAgent() (AgentReporter, KeyResolver) {
func (r *noopCloudAgentReporter) WithDeferredAgent() (AgentReporter, KeyResolver) {
return &noopAgentReporter{}, noopKeyResolver{}
}

Expand Down Expand Up @@ -74,19 +91,19 @@ func NewNoopWorkerReporter() WorkerReporter {
func (r *noopWorkerReporter) RegisterFunc(f func(ts time.Time, tx WorkerTx) bool) {}
func (r *noopWorkerReporter) Tx(f func(WorkerTx)) {}
func (r *noopWorkerReporter) TxAt(ts time.Time, f func(WorkerTx)) {}
func (r *noopWorkerReporter) ReportLoad(v float32) {}
func (r *noopWorkerReporter) ReportStatus(v WorkerStatus) {}
func (r *noopWorkerReporter) ReportStartTime(v time.Time) {}
func (r *noopWorkerReporter) ReportJobsCurrent(v uint32) {}
func (r *noopWorkerReporter) ReportKind(v AgentKind) {}
func (r *noopWorkerReporter) ReportCPU(v int64) {}
func (r *noopWorkerReporter) ReportCPULimit(v int64) {}
func (r *noopWorkerReporter) ReportMem(v int64) {}
func (r *noopWorkerReporter) ReportMemLimit(v int64) {}
func (r *noopWorkerReporter) ReportLoad(v float32) {}
func (r *noopWorkerReporter) ReportStatus(v WorkerStatus) {}
func (r *noopWorkerReporter) ReportRegion(v string) {}
func (r *noopWorkerReporter) ReportVersion(v string) {}
func (r *noopWorkerReporter) ReportSdkVersion(v string) {}
func (r *noopWorkerReporter) ReportState(v WorkerState) {}
func (r *noopWorkerReporter) ReportStartedAt(v time.Time) {}
func (r *noopWorkerReporter) ReportJobsCurrent(v uint16) {}
func (r *noopWorkerReporter) ReportKind(v AgentKind) {}
func (r *noopWorkerReporter) WithJob(id string) JobReporter {
return &noopJobReporter{}
}
Expand All @@ -107,6 +124,7 @@ func (r *noopJobReporter) ReportRoomSessionID(v string) {}
func (r *noopJobReporter) ReportKind(v JobKind) {}
func (r *noopJobReporter) ReportStatus(v JobStatus) {}
func (r *noopJobReporter) ReportDuration(v uint32) {}
func (r *noopJobReporter) ReportDispatchedAt(v time.Time) {}
func (r *noopJobReporter) ReportJoinedAt(v time.Time) {}
func (r *noopJobReporter) ReportCompletedAt(v time.Time) {}
func (r *noopJobReporter) ReportDurationMinutes(v uint8) {}
func (r *noopJobReporter) ReportStartTime(v time.Time) {}
func (r *noopJobReporter) ReportEndTime(v time.Time) {}
func (r *noopJobReporter) ReportJoinLatency(v uint32) {}
17 changes: 9 additions & 8 deletions observability/agentsobs/gen_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,6 @@ const (
WorkerStatusFull WorkerStatus = "full"
)

type WorkerState string

const (
WorkerStateUndefined WorkerState = ""
WorkerStateOnline WorkerState = "online"
WorkerStateOffline WorkerState = "offline"
)

type AgentKind string

const (
Expand All @@ -25,6 +17,14 @@ const (
AgentKindSelfhost AgentKind = "selfhost"
)

type WorkerState string

const (
WorkerStateUndefined WorkerState = ""
WorkerStateOnline WorkerState = "online"
WorkerStateOffline WorkerState = "offline"
)

type JobKind string

const (
Expand All @@ -48,6 +48,7 @@ type Rollup string

const (
RollupUndefined Rollup = ""
RollupAgent Rollup = "agent"
RollupWorker Rollup = "worker"
RollupWorkerSeries Rollup = "worker_series"
RollupJob Rollup = "job"
Expand Down
4 changes: 1 addition & 3 deletions observability/roomobs/room.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ func UnpackCountryCode(code uint16) (isoAlpha2 string) {
}

func ToClientOS(os string) ClientOS {
os = strings.ToLower(os)

switch os {
switch strings.ToLower(os) {
case "":
return ClientOSUndefined
case "ios":
Expand Down
1 change: 1 addition & 0 deletions protobufs/livekit_agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ message JobState {
int64 updated_at = 5;
string participant_identity = 6;
string worker_id = 7;
string agent_id = 8;
}

// from Worker to Server
Expand Down
Loading