From 35ae4ae8bec9809a6c2cdb06d7a531606dec9a4f Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Thu, 9 Jun 2022 13:07:37 -0400 Subject: [PATCH 1/6] workerStatus --- sdks/go/pkg/beam/core/core.go | 2 +- sdks/go/pkg/beam/core/metrics/store.go | 8 ++ .../pkg/beam/core/runtime/harness/harness.go | 25 ++--- .../runtime/harness/statecache/statecache.go | 4 + .../core/runtime/harness/worker_status.go | 36 ++++++- .../runtime/harness/worker_status_test.go | 3 +- .../test/integration/wordcount/wordcount.go | 9 +- .../integration/wordcount/wordcount_test.go | 102 +++++++++--------- 8 files changed, 116 insertions(+), 73 deletions(-) diff --git a/sdks/go/pkg/beam/core/core.go b/sdks/go/pkg/beam/core/core.go index e60e9e471c90..687726971545 100644 --- a/sdks/go/pkg/beam/core/core.go +++ b/sdks/go/pkg/beam/core/core.go @@ -27,5 +27,5 @@ const ( // SdkName is the human readable name of the SDK for UserAgents. SdkName = "Apache Beam SDK for Go" // SdkVersion is the current version of the SDK. - SdkVersion = "2.40.0.dev" + SdkVersion = "2.39.0" ) diff --git a/sdks/go/pkg/beam/core/metrics/store.go b/sdks/go/pkg/beam/core/metrics/store.go index 7e2240a188ec..8ad01c9d4dd2 100644 --- a/sdks/go/pkg/beam/core/metrics/store.go +++ b/sdks/go/pkg/beam/core/metrics/store.go @@ -218,3 +218,11 @@ func (b *Store) storeMetric(pid string, n name, m userMetric) { } b.store[l] = m } + +func (b *Store) BundleState() *BundleState { + return b.bundleState +} + +func (b *Store) StateRegistry() map[string]*[4]ExecutionState { + return b.stateRegistry +} diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 04a2baf41b4b..097b5573ba0e 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -112,18 +112,6 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string, options log.Debugf(ctx, "control response channel closed") }() - // if the runner supports worker status api then expose SDK harness status - if statusEndpoint != "" { - statusHandler, err := newWorkerStatusHandler(ctx, statusEndpoint) - if err != nil { - log.Errorf(ctx, "error establishing connection to worker status API: %v", err) - } else { - if err := statusHandler.start(ctx); err == nil { - defer statusHandler.stop(ctx) - } - } - } - sideCache := statecache.SideInputCache{} sideCache.Init(cacheSize) @@ -140,6 +128,19 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string, options state: &StateChannelManager{}, cache: &sideCache, } + + // if the runner supports worker status api then expose SDK harness status + if statusEndpoint != "" { + statusHandler, err := newWorkerStatusHandler(ctx, statusEndpoint, ctrl.metStore, ctrl.cache) + if err != nil { + log.Errorf(ctx, "error establishing connection to worker status API: %v", err) + } else { + if err := statusHandler.start(ctx); err == nil { + defer statusHandler.stop(ctx) + } + } + } + // gRPC requires all readers of a stream be the same goroutine, so this goroutine // is responsible for managing the network data. All it does is pull data from // the stream, and hand off the message to a goroutine to actually be handled, diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go index 73430869b2e9..e75981d4dec0 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go @@ -272,3 +272,7 @@ func (c *SideInputCache) evictElement(ctx context.Context) { } } } + +func (c *SideInputCache) CacheMetrics() CacheMetrics { + return c.metrics +} diff --git a/sdks/go/pkg/beam/core/runtime/harness/worker_status.go b/sdks/go/pkg/beam/core/runtime/harness/worker_status.go index bf5a9a9099b1..6213d50a1545 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/worker_status.go +++ b/sdks/go/pkg/beam/core/runtime/harness/worker_status.go @@ -17,6 +17,9 @@ package harness import ( "context" + "fmt" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache" "io" "runtime" "sync" @@ -34,14 +37,16 @@ type workerStatusHandler struct { conn *grpc.ClientConn shouldShutdown int32 wg sync.WaitGroup + metStore map[instructionID]*metrics.Store //*metrics.Store for active bundles + cache *statecache.SideInputCache } -func newWorkerStatusHandler(ctx context.Context, endpoint string) (*workerStatusHandler, error) { +func newWorkerStatusHandler(ctx context.Context, endpoint string, metStore map[instructionID]*metrics.Store, cache *statecache.SideInputCache) (*workerStatusHandler, error) { sconn, err := dial(ctx, endpoint, 60*time.Second) if err != nil { return nil, errors.Wrapf(err, "failed to connect: %v\n", endpoint) } - return &workerStatusHandler{conn: sconn, shouldShutdown: 0}, nil + return &workerStatusHandler{conn: sconn, shouldShutdown: 0, metStore: metStore, cache: cache}, nil } func (w *workerStatusHandler) isAlive() bool { @@ -65,6 +70,29 @@ func (w *workerStatusHandler) start(ctx context.Context) error { return nil } +func memoryUsage() string { + m := runtime.MemStats{} + runtime.ReadMemStats(&m) + return fmt.Sprintf("\n Total Alloc: %v\n Sys: %v\n Mallocs: %v\n Frees: %v\n HeapAlloc: %v", m.TotalAlloc, m.Sys, m.Mallocs, m.Frees, m.HeapAlloc) +} + +func (w *workerStatusHandler) activeProcessBundleStates() string { + var states string + for bundleID, store := range w.metStore { + execStates := "" + for bundleID, state := range store.StateRegistry() { + execStates += fmt.Sprintf("ID: %v Execution States: %v,\n", bundleID, *state) + + } + states += fmt.Sprintf("\nBundle ID: %v\nBundle State: %v\nBundle Execution States: %#v\n", bundleID, *store.BundleState(), store.StateRegistry()) + } + return states +} + +func (w *workerStatusHandler) cacheStats() string { + return fmt.Sprintf("Cache:\n%v", w.cache.CacheMetrics()) +} + // reader reads the WorkerStatusRequest from the stream and sends a processed WorkerStatusResponse to // a response channel. func (w *workerStatusHandler) reader(ctx context.Context, stub fnpb.BeamFnWorkerStatus_WorkerStatusClient) { @@ -78,7 +106,9 @@ func (w *workerStatusHandler) reader(ctx context.Context, stub fnpb.BeamFnWorker } log.Debugf(ctx, "RECV-status: %v", req.GetId()) runtime.Stack(buf, true) - response := &fnpb.WorkerStatusResponse{Id: req.GetId(), StatusInfo: string(buf)} + statusInfo := fmt.Sprintf("\n============Memory Usage============\n%s\n============Active Process Bundle States============\n%s\n============Cache Stats============\n%s\n============Goroutine Dump============\n%s\n", memoryUsage(), w.activeProcessBundleStates(), w.cacheStats(), string(buf)) + log.Info(ctx, statusInfo) + response := &fnpb.WorkerStatusResponse{Id: req.GetId(), StatusInfo: statusInfo} if err := stub.Send(response); err != nil && err != io.EOF { log.Errorf(ctx, "workerStatus.Writer: Failed to respond: %v", err) } diff --git a/sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go b/sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go index 6fc6ab0ca91d..e0a6f7ef53e1 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go +++ b/sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go @@ -18,6 +18,7 @@ package harness import ( "context" "fmt" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache" "log" "net" "testing" @@ -75,7 +76,7 @@ func TestSendStatusResponse(t *testing.T) { t.Fatalf("unable to start test server: %v", err) } - statusHandler := workerStatusHandler{conn: conn} + statusHandler := workerStatusHandler{conn: conn, cache: &statecache.SideInputCache{}} if err := statusHandler.start(ctx); err != nil { t.Fatal(err) } diff --git a/sdks/go/test/integration/wordcount/wordcount.go b/sdks/go/test/integration/wordcount/wordcount.go index ee18cf688943..bfe97842ff96 100644 --- a/sdks/go/test/integration/wordcount/wordcount.go +++ b/sdks/go/test/integration/wordcount/wordcount.go @@ -18,15 +18,14 @@ package wordcount import ( "context" - "regexp" - "strings" - "fmt" - "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" + "regexp" + "strings" + "time" ) var ( @@ -57,6 +56,7 @@ func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { // extractFn is a DoFn that emits the words in a given line. func extractFn(ctx context.Context, line string, emit func(string)) { + time.Sleep(time.Second * 660) lineLen.Update(ctx, int64(len(line))) if len(strings.TrimSpace(line)) == 0 { empty.Inc(ctx, 1) @@ -81,7 +81,6 @@ func formatFn(w string, c int) string { // WordCount returns a self-validating wordcount pipeline. func WordCount(glob, hash string, size int) *beam.Pipeline { p, s := beam.NewPipelineWithRoot() - in := textio.Read(s, glob) WordCountFromPCol(s, in, hash, size) return p diff --git a/sdks/go/test/integration/wordcount/wordcount_test.go b/sdks/go/test/integration/wordcount/wordcount_test.go index 09c6683cd14e..6333dbfe4547 100644 --- a/sdks/go/test/integration/wordcount/wordcount_test.go +++ b/sdks/go/test/integration/wordcount/wordcount_test.go @@ -50,57 +50,57 @@ func TestWordCount(t *testing.T) { "wordcount.extractFn", 1, }, - { - []string{ - "foo foo foo", - "foo foo", - "foo", - }, - 1, - "jAk8+k4BOH7vQDUiUZdfWg==", - 6, - metrics.DistributionValue{Count: 3, Sum: 21, Min: 3, Max: 11}, - "extractFn", - 1, - }, - { - []string{ - "bar bar foo bar foo foo", - }, - 2, - "Nz70m/sn3Ep9o484r7MalQ==", - 6, - metrics.DistributionValue{Count: 1, Sum: 23, Min: 23, Max: 23}, - "CountFn", - 1, - }, - { - []string{ - "foo bar foo bar foo bar", - }, - 2, - "Nz70m/sn3Ep9o484r7MalQ==", // ordering doesn't matter: same hash as above - 6, - metrics.DistributionValue{Count: 1, Sum: 23, Min: 23, Max: 23}, - "extract", - 1, - }, - { - []string{ - "", - "bar foo bar", - " \t ", - " \n\n\n ", - "foo bar", - " foo", - }, - 2, - "Nz70m/sn3Ep9o484r7MalQ==", // whitespace doesn't matter: same hash as above - 6, - metrics.DistributionValue{Count: 6, Sum: 37, Min: 0, Max: 11}, - "CreateFn", - 0, - }, + //{ + // []string{ + // "foo foo foo", + // "foo foo", + // "foo", + // }, + // 1, + // "jAk8+k4BOH7vQDUiUZdfWg==", + // 6, + // metrics.DistributionValue{Count: 3, Sum: 21, Min: 3, Max: 11}, + // "extractFn", + // 1, + //}, + //{ + // []string{ + // "bar bar foo bar foo foo", + // }, + // 2, + // "Nz70m/sn3Ep9o484r7MalQ==", + // 6, + // metrics.DistributionValue{Count: 1, Sum: 23, Min: 23, Max: 23}, + // "CountFn", + // 1, + //}, + //{ + // []string{ + // "foo bar foo bar foo bar", + // }, + // 2, + // "Nz70m/sn3Ep9o484r7MalQ==", // ordering doesn't matter: same hash as above + // 6, + // metrics.DistributionValue{Count: 1, Sum: 23, Min: 23, Max: 23}, + // "extract", + // 1, + //}, + //{ + // []string{ + // "", + // "bar foo bar", + // " \t ", + // " \n\n\n ", + // "foo bar", + // " foo", + // }, + // 2, + // "Nz70m/sn3Ep9o484r7MalQ==", // whitespace doesn't matter: same hash as above + // 6, + // metrics.DistributionValue{Count: 6, Sum: 37, Min: 0, Max: 11}, + // "CreateFn", + // 0, + //}, } for _, test := range tests { From 1801d67fa83a05f3284efa99565a36c7c4af75f3 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Thu, 9 Jun 2022 15:50:24 -0400 Subject: [PATCH 2/6] add doc, formatting --- sdks/go/pkg/beam/core/core.go | 2 +- sdks/go/pkg/beam/core/metrics/store.go | 2 + .../runtime/harness/statecache/statecache.go | 1 + .../core/runtime/harness/worker_status.go | 18 ++-- .../test/integration/wordcount/wordcount.go | 2 - .../integration/wordcount/wordcount_test.go | 102 +++++++++--------- 6 files changed, 67 insertions(+), 60 deletions(-) diff --git a/sdks/go/pkg/beam/core/core.go b/sdks/go/pkg/beam/core/core.go index 687726971545..e60e9e471c90 100644 --- a/sdks/go/pkg/beam/core/core.go +++ b/sdks/go/pkg/beam/core/core.go @@ -27,5 +27,5 @@ const ( // SdkName is the human readable name of the SDK for UserAgents. SdkName = "Apache Beam SDK for Go" // SdkVersion is the current version of the SDK. - SdkVersion = "2.39.0" + SdkVersion = "2.40.0.dev" ) diff --git a/sdks/go/pkg/beam/core/metrics/store.go b/sdks/go/pkg/beam/core/metrics/store.go index 8ad01c9d4dd2..f8b8382c02bf 100644 --- a/sdks/go/pkg/beam/core/metrics/store.go +++ b/sdks/go/pkg/beam/core/metrics/store.go @@ -219,10 +219,12 @@ func (b *Store) storeMetric(pid string, n name, m userMetric) { b.store[l] = m } +// BundleState returns the bundle state. func (b *Store) BundleState() *BundleState { return b.bundleState } +// StateRegistry returns the state registry that stores bundleID to executions states mapping. func (b *Store) StateRegistry() map[string]*[4]ExecutionState { return b.stateRegistry } diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go index e75981d4dec0..34ac998b30de 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go @@ -273,6 +273,7 @@ func (c *SideInputCache) evictElement(ctx context.Context) { } } +// CacheMetrics returns the cache metrics for current side input cache. func (c *SideInputCache) CacheMetrics() CacheMetrics { return c.metrics } diff --git a/sdks/go/pkg/beam/core/runtime/harness/worker_status.go b/sdks/go/pkg/beam/core/runtime/harness/worker_status.go index 6213d50a1545..5cbc9b1d9e44 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/worker_status.go +++ b/sdks/go/pkg/beam/core/runtime/harness/worker_status.go @@ -73,7 +73,7 @@ func (w *workerStatusHandler) start(ctx context.Context) error { func memoryUsage() string { m := runtime.MemStats{} runtime.ReadMemStats(&m) - return fmt.Sprintf("\n Total Alloc: %v\n Sys: %v\n Mallocs: %v\n Frees: %v\n HeapAlloc: %v", m.TotalAlloc, m.Sys, m.Mallocs, m.Frees, m.HeapAlloc) + return fmt.Sprintf("\n Total Alloc: %v bytes \n Sys: %v bytes \n Mallocs: %v\n Frees: %v\n HeapAlloc: %v bytes", m.TotalAlloc, m.Sys, m.Mallocs, m.Frees, m.HeapAlloc) } func (w *workerStatusHandler) activeProcessBundleStates() string { @@ -81,10 +81,10 @@ func (w *workerStatusHandler) activeProcessBundleStates() string { for bundleID, store := range w.metStore { execStates := "" for bundleID, state := range store.StateRegistry() { - execStates += fmt.Sprintf("ID: %v Execution States: %v,\n", bundleID, *state) + execStates += fmt.Sprintf("ID: %v Execution States: %#v,", bundleID, *state) } - states += fmt.Sprintf("\nBundle ID: %v\nBundle State: %v\nBundle Execution States: %#v\n", bundleID, *store.BundleState(), store.StateRegistry()) + states += fmt.Sprintf("\nBundle ID: %v\nBundle State: %#v\nBundle Execution States: %v\n", bundleID, *store.BundleState(), execStates) } return states } @@ -93,11 +93,17 @@ func (w *workerStatusHandler) cacheStats() string { return fmt.Sprintf("Cache:\n%v", w.cache.CacheMetrics()) } +func goroutineDump() string { + buf := make([]byte, 1<<16) + runtime.Stack(buf, true) + return string(buf) +} + // reader reads the WorkerStatusRequest from the stream and sends a processed WorkerStatusResponse to // a response channel. func (w *workerStatusHandler) reader(ctx context.Context, stub fnpb.BeamFnWorkerStatus_WorkerStatusClient) { defer w.wg.Done() - buf := make([]byte, 1<<16) + for w.isAlive() { req, err := stub.Recv() if err != nil && err != io.EOF { @@ -105,8 +111,8 @@ func (w *workerStatusHandler) reader(ctx context.Context, stub fnpb.BeamFnWorker return } log.Debugf(ctx, "RECV-status: %v", req.GetId()) - runtime.Stack(buf, true) - statusInfo := fmt.Sprintf("\n============Memory Usage============\n%s\n============Active Process Bundle States============\n%s\n============Cache Stats============\n%s\n============Goroutine Dump============\n%s\n", memoryUsage(), w.activeProcessBundleStates(), w.cacheStats(), string(buf)) + + statusInfo := fmt.Sprintf("\n============Memory Usage============\n%s\n============Active Process Bundle States============\n%s\n============Cache Stats============\n%s\n============Goroutine Dump============\n%s\n", memoryUsage(), w.activeProcessBundleStates(), w.cacheStats(), goroutineDump()) log.Info(ctx, statusInfo) response := &fnpb.WorkerStatusResponse{Id: req.GetId(), StatusInfo: statusInfo} if err := stub.Send(response); err != nil && err != io.EOF { diff --git a/sdks/go/test/integration/wordcount/wordcount.go b/sdks/go/test/integration/wordcount/wordcount.go index bfe97842ff96..0e7215f4c0f8 100644 --- a/sdks/go/test/integration/wordcount/wordcount.go +++ b/sdks/go/test/integration/wordcount/wordcount.go @@ -25,7 +25,6 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" "regexp" "strings" - "time" ) var ( @@ -56,7 +55,6 @@ func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { // extractFn is a DoFn that emits the words in a given line. func extractFn(ctx context.Context, line string, emit func(string)) { - time.Sleep(time.Second * 660) lineLen.Update(ctx, int64(len(line))) if len(strings.TrimSpace(line)) == 0 { empty.Inc(ctx, 1) diff --git a/sdks/go/test/integration/wordcount/wordcount_test.go b/sdks/go/test/integration/wordcount/wordcount_test.go index 6333dbfe4547..09c6683cd14e 100644 --- a/sdks/go/test/integration/wordcount/wordcount_test.go +++ b/sdks/go/test/integration/wordcount/wordcount_test.go @@ -50,57 +50,57 @@ func TestWordCount(t *testing.T) { "wordcount.extractFn", 1, }, - //{ - // []string{ - // "foo foo foo", - // "foo foo", - // "foo", - // }, - // 1, - // "jAk8+k4BOH7vQDUiUZdfWg==", - // 6, - // metrics.DistributionValue{Count: 3, Sum: 21, Min: 3, Max: 11}, - // "extractFn", - // 1, - //}, - //{ - // []string{ - // "bar bar foo bar foo foo", - // }, - // 2, - // "Nz70m/sn3Ep9o484r7MalQ==", - // 6, - // metrics.DistributionValue{Count: 1, Sum: 23, Min: 23, Max: 23}, - // "CountFn", - // 1, - //}, - //{ - // []string{ - // "foo bar foo bar foo bar", - // }, - // 2, - // "Nz70m/sn3Ep9o484r7MalQ==", // ordering doesn't matter: same hash as above - // 6, - // metrics.DistributionValue{Count: 1, Sum: 23, Min: 23, Max: 23}, - // "extract", - // 1, - //}, - //{ - // []string{ - // "", - // "bar foo bar", - // " \t ", - // " \n\n\n ", - // "foo bar", - // " foo", - // }, - // 2, - // "Nz70m/sn3Ep9o484r7MalQ==", // whitespace doesn't matter: same hash as above - // 6, - // metrics.DistributionValue{Count: 6, Sum: 37, Min: 0, Max: 11}, - // "CreateFn", - // 0, - //}, + { + []string{ + "foo foo foo", + "foo foo", + "foo", + }, + 1, + "jAk8+k4BOH7vQDUiUZdfWg==", + 6, + metrics.DistributionValue{Count: 3, Sum: 21, Min: 3, Max: 11}, + "extractFn", + 1, + }, + { + []string{ + "bar bar foo bar foo foo", + }, + 2, + "Nz70m/sn3Ep9o484r7MalQ==", + 6, + metrics.DistributionValue{Count: 1, Sum: 23, Min: 23, Max: 23}, + "CountFn", + 1, + }, + { + []string{ + "foo bar foo bar foo bar", + }, + 2, + "Nz70m/sn3Ep9o484r7MalQ==", // ordering doesn't matter: same hash as above + 6, + metrics.DistributionValue{Count: 1, Sum: 23, Min: 23, Max: 23}, + "extract", + 1, + }, + { + []string{ + "", + "bar foo bar", + " \t ", + " \n\n\n ", + "foo bar", + " foo", + }, + 2, + "Nz70m/sn3Ep9o484r7MalQ==", // whitespace doesn't matter: same hash as above + 6, + metrics.DistributionValue{Count: 6, Sum: 37, Min: 0, Max: 11}, + "CreateFn", + 0, + }, } for _, test := range tests { From ac85c087844d5eb0333bcd0bb8833ea7638af444 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Thu, 9 Jun 2022 16:21:29 -0400 Subject: [PATCH 3/6] add env, remove log --- sdks/go/container/boot.go | 2 +- sdks/go/pkg/beam/core/runtime/harness/harness.go | 16 +++------------- .../pkg/beam/core/runtime/harness/init/init.go | 2 +- .../beam/core/runtime/harness/worker_status.go | 1 - 4 files changed, 5 insertions(+), 16 deletions(-) diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go index 88b654230c1d..5617e25109f4 100644 --- a/sdks/go/container/boot.go +++ b/sdks/go/container/boot.go @@ -108,7 +108,7 @@ func main() { "--options=" + options, } if info.GetStatusEndpoint() != nil { - args = append(args, "--status_endpoint="+info.GetStatusEndpoint().GetUrl()) + os.Setenv("STATUS_ENDPOINT", info.GetStatusEndpoint().GetUrl()) } log.Fatalf("User program exited: %v", execx.Execute(prog, args...)) diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 097b5573ba0e..c0371531b850 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "os" "sync" "sync/atomic" "time" @@ -37,26 +38,15 @@ import ( "google.golang.org/protobuf/types/known/durationpb" ) -// StatusAddress is a type of status endpoint address as an optional argument to harness.Main(). -type StatusAddress string - // TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a plugin). // Main is the main entrypoint for the Go harness. It runs at "runtime" -- not // "pipeline-construction time" -- on each worker. It is a FnAPI client and // ultimately responsible for correctly executing user code. -func Main(ctx context.Context, loggingEndpoint, controlEndpoint string, options ...interface{}) error { +func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { hooks.DeserializeHooksFromOptions(ctx) - statusEndpoint := "" - for _, option := range options { - switch option := option.(type) { - case StatusAddress: - statusEndpoint = string(option) - default: - return errors.Errorf("unknown type %T, value %v in error call", option, option) - } - } + statusEndpoint := os.Getenv("STATUS_ENDPOINT") // Pass in the logging endpoint for use w/the default remote logging hook. ctx = context.WithValue(ctx, loggingEndpointCtxKey, loggingEndpoint) diff --git a/sdks/go/pkg/beam/core/runtime/harness/init/init.go b/sdks/go/pkg/beam/core/runtime/harness/init/init.go index fb9e1321edbd..fc5385793835 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/init/init.go +++ b/sdks/go/pkg/beam/core/runtime/harness/init/init.go @@ -108,7 +108,7 @@ func hook() { // does, and establish the background context here. ctx := grpcx.WriteWorkerID(context.Background(), *id) - if err := harness.Main(ctx, *loggingEndpoint, *controlEndpoint, harness.StatusAddress(*statusEndpoint)); err != nil { + if err := harness.Main(ctx, *loggingEndpoint, *controlEndpoint); err != nil { fmt.Fprintf(os.Stderr, "Worker failed: %v\n", err) switch ShutdownMode { case Terminate: diff --git a/sdks/go/pkg/beam/core/runtime/harness/worker_status.go b/sdks/go/pkg/beam/core/runtime/harness/worker_status.go index 5cbc9b1d9e44..ddcb9aae77ac 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/worker_status.go +++ b/sdks/go/pkg/beam/core/runtime/harness/worker_status.go @@ -113,7 +113,6 @@ func (w *workerStatusHandler) reader(ctx context.Context, stub fnpb.BeamFnWorker log.Debugf(ctx, "RECV-status: %v", req.GetId()) statusInfo := fmt.Sprintf("\n============Memory Usage============\n%s\n============Active Process Bundle States============\n%s\n============Cache Stats============\n%s\n============Goroutine Dump============\n%s\n", memoryUsage(), w.activeProcessBundleStates(), w.cacheStats(), goroutineDump()) - log.Info(ctx, statusInfo) response := &fnpb.WorkerStatusResponse{Id: req.GetId(), StatusInfo: statusInfo} if err := stub.Send(response); err != nil && err != io.EOF { log.Errorf(ctx, "workerStatus.Writer: Failed to respond: %v", err) From 1d7e1e37a13a3dafdaa40f5a121f70160bbcdca8 Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Thu, 9 Jun 2022 17:19:34 -0400 Subject: [PATCH 4/6] fix static check --- sdks/go/pkg/beam/core/runtime/harness/init/init.go | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/go/pkg/beam/core/runtime/harness/init/init.go b/sdks/go/pkg/beam/core/runtime/harness/init/init.go index fc5385793835..3f8e2336f5be 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/init/init.go +++ b/sdks/go/pkg/beam/core/runtime/harness/init/init.go @@ -42,7 +42,6 @@ var ( id = flag.String("id", "", "Local identifier (required in worker mode).") loggingEndpoint = flag.String("logging_endpoint", "", "Local logging gRPC endpoint (required in worker mode).") controlEndpoint = flag.String("control_endpoint", "", "Local control gRPC endpoint (required in worker mode).") - statusEndpoint = flag.String("status_endpoint", "", "Local status gRPC endpoint (optional in worker mode).") //lint:ignore U1000 semiPersistDir flag is passed in through the boot container, will need to be removed later semiPersistDir = flag.String("semi_persist_dir", "/tmp", "Local semi-persistent directory (optional in worker mode).") options = flag.String("options", "", "JSON-encoded pipeline options (required in worker mode).") From c444af22880f81968835cd75963bf6a707a2f3cb Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Fri, 10 Jun 2022 12:56:17 -0400 Subject: [PATCH 5/6] thread safe, clarified messages --- sdks/go/pkg/beam/core/metrics/store.go | 48 +++++++++++-- .../pkg/beam/core/runtime/harness/harness.go | 16 ++++- .../runtime/harness/statecache/statecache.go | 2 + .../core/runtime/harness/worker_status.go | 71 +++++++++++-------- .../runtime/harness/worker_status_test.go | 5 +- .../test/integration/wordcount/wordcount.go | 5 +- 6 files changed, 111 insertions(+), 36 deletions(-) diff --git a/sdks/go/pkg/beam/core/metrics/store.go b/sdks/go/pkg/beam/core/metrics/store.go index f8b8382c02bf..2608b4792dee 100644 --- a/sdks/go/pkg/beam/core/metrics/store.go +++ b/sdks/go/pkg/beam/core/metrics/store.go @@ -17,8 +17,11 @@ package metrics import ( "fmt" + "strings" "sync" + "sync/atomic" "time" + "unsafe" ) // Implementation note: We avoid depending on the FnAPI protos here @@ -156,6 +159,22 @@ type ptCounterSet struct { type bundleProcState int +// String implements the Stringer interface. +func (b bundleProcState) String() string { + switch b { + case StartBundle: + return "START_BUNDLE" + case ProcessBundle: + return "PROCESS_BUNDLE" + case FinishBundle: + return "FINISH_BUNDLE" + case TotalBundle: + return "TOTAL_BUNDLE" + default: + return "unknown process bundle state!" + } +} + const ( // StartBundle indicates starting state of a bundle StartBundle bundleProcState = 0 @@ -174,12 +193,22 @@ type ExecutionState struct { TotalTime time.Duration } +// String implements the Stringer interface. +func (e ExecutionState) String() string { + return fmt.Sprintf("Execution State:\n\t State: %s\n\t IsProcessing: %v\n\t Total time: %v\n", e.State, e.IsProcessing, e.TotalTime) +} + // BundleState stores information about a PTransform for execution time metrics. type BundleState struct { pid string currentState bundleProcState } +// String implements the Stringer interface. +func (b BundleState) String() string { + return fmt.Sprintf("Bundle State:\n\t PTransform ID: %s\n\t Current state: %s", b.pid, b.currentState) +} + // currentStateVal exports the current state of a bundle wrt PTransform. type currentStateVal struct { pid string @@ -220,11 +249,22 @@ func (b *Store) storeMetric(pid string, n name, m userMetric) { } // BundleState returns the bundle state. -func (b *Store) BundleState() *BundleState { - return b.bundleState +func (b *Store) BundleState() string { + bs := *(*BundleState)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&b.bundleState)))) + return bs.String() } // StateRegistry returns the state registry that stores bundleID to executions states mapping. -func (b *Store) StateRegistry() map[string]*[4]ExecutionState { - return b.stateRegistry +func (b *Store) StateRegistry() string { + b.mu.Lock() + defer b.mu.Unlock() + builder := &strings.Builder{} + builder.WriteString("\n | All Bundle Process States | \n") + for bundleID, state := range b.stateRegistry { + builder.WriteString(fmt.Sprintf("\tBundle ID: %s\n", bundleID)) + for i := 0; i < 4; i++ { + builder.WriteString(fmt.Sprintf("\t%s\n", state[i])) + } + } + return builder.String() } diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index c0371531b850..441ec74ec6e8 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -21,6 +21,7 @@ import ( "fmt" "io" "os" + "strings" "sync" "sync/atomic" "time" @@ -46,6 +47,9 @@ import ( func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { hooks.DeserializeHooksFromOptions(ctx) + // Extract environment variables. These are optional runner supported capabilities. + // Expected env variables: + // STATUS_ENDPOINT : Endpoint to connect to status server used for worker status reporting. statusEndpoint := os.Getenv("STATUS_ENDPOINT") // Pass in the logging endpoint for use w/the default remote logging hook. @@ -121,7 +125,7 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { // if the runner supports worker status api then expose SDK harness status if statusEndpoint != "" { - statusHandler, err := newWorkerStatusHandler(ctx, statusEndpoint, ctrl.metStore, ctrl.cache) + statusHandler, err := newWorkerStatusHandler(ctx, statusEndpoint, ctrl.cache, func(statusInfo *strings.Builder) { ctrl.metStoreToString(statusInfo) }) if err != nil { log.Errorf(ctx, "error establishing connection to worker status API: %v", err) } else { @@ -269,6 +273,16 @@ type control struct { cache *statecache.SideInputCache } +func (c *control) metStoreToString(statusInfo *strings.Builder) { + c.mu.Lock() + defer c.mu.Unlock() + for bundleID, store := range c.metStore { + statusInfo.WriteString(fmt.Sprintf("Bundle ID: %v\n", bundleID)) + statusInfo.WriteString(fmt.Sprintf("\t%s", store.BundleState())) + statusInfo.WriteString(fmt.Sprintf("\t%s", store.StateRegistry())) + } +} + func (c *control) getOrCreatePlan(bdID bundleDescriptorID) (*exec.Plan, error) { c.mu.Lock() plans, ok := c.plans[bdID] diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go index 34ac998b30de..cdbf54ef77d3 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go @@ -275,5 +275,7 @@ func (c *SideInputCache) evictElement(ctx context.Context) { // CacheMetrics returns the cache metrics for current side input cache. func (c *SideInputCache) CacheMetrics() CacheMetrics { + c.mu.Lock() + defer c.mu.Unlock() return c.metrics } diff --git a/sdks/go/pkg/beam/core/runtime/harness/worker_status.go b/sdks/go/pkg/beam/core/runtime/harness/worker_status.go index ddcb9aae77ac..e4ce4b7e71a8 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/worker_status.go +++ b/sdks/go/pkg/beam/core/runtime/harness/worker_status.go @@ -18,10 +18,12 @@ package harness import ( "context" "fmt" - "github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics" "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache" "io" "runtime" + "runtime/debug" + "runtime/pprof" + "strings" "sync" "sync/atomic" "time" @@ -34,19 +36,19 @@ import ( // workerStatusHandler stores the communication information of WorkerStatus API. type workerStatusHandler struct { - conn *grpc.ClientConn - shouldShutdown int32 - wg sync.WaitGroup - metStore map[instructionID]*metrics.Store //*metrics.Store for active bundles - cache *statecache.SideInputCache + conn *grpc.ClientConn + shouldShutdown int32 + wg sync.WaitGroup + cache *statecache.SideInputCache + metStoreToString func(*strings.Builder) } -func newWorkerStatusHandler(ctx context.Context, endpoint string, metStore map[instructionID]*metrics.Store, cache *statecache.SideInputCache) (*workerStatusHandler, error) { +func newWorkerStatusHandler(ctx context.Context, endpoint string, cache *statecache.SideInputCache, metStoreToString func(*strings.Builder)) (*workerStatusHandler, error) { sconn, err := dial(ctx, endpoint, 60*time.Second) if err != nil { return nil, errors.Wrapf(err, "failed to connect: %v\n", endpoint) } - return &workerStatusHandler{conn: sconn, shouldShutdown: 0, metStore: metStore, cache: cache}, nil + return &workerStatusHandler{conn: sconn, shouldShutdown: 0, cache: cache, metStoreToString: metStoreToString}, nil } func (w *workerStatusHandler) isAlive() bool { @@ -70,33 +72,40 @@ func (w *workerStatusHandler) start(ctx context.Context) error { return nil } -func memoryUsage() string { +func memoryUsage(statusInfo *strings.Builder) { + statusInfo.WriteString("\n============Memory Usage============\n") m := runtime.MemStats{} runtime.ReadMemStats(&m) - return fmt.Sprintf("\n Total Alloc: %v bytes \n Sys: %v bytes \n Mallocs: %v\n Frees: %v\n HeapAlloc: %v bytes", m.TotalAlloc, m.Sys, m.Mallocs, m.Frees, m.HeapAlloc) + statusInfo.WriteString(fmt.Sprintf("heap in-use-spans/allocated/total/max = %d/%d/%d/%d MB\n", m.HeapInuse>>20, m.HeapAlloc>>20, m.TotalAlloc>>20, m.HeapSys>>20)) + statusInfo.WriteString(fmt.Sprintf("stack in-use-spans/max = %d/%d MB\n", m.StackInuse>>20, m.StackSys>>20)) + statusInfo.WriteString(fmt.Sprintf("GC-CPU percentage = %.2f %%\n", m.GCCPUFraction*100)) + statusInfo.WriteString(fmt.Sprintf("Last GC time: %v\n", time.Unix(0, int64(m.LastGC)))) + statusInfo.WriteString(fmt.Sprintf("Next GC: %v MB\n", m.NextGC>>20)) } -func (w *workerStatusHandler) activeProcessBundleStates() string { - var states string - for bundleID, store := range w.metStore { - execStates := "" - for bundleID, state := range store.StateRegistry() { - execStates += fmt.Sprintf("ID: %v Execution States: %#v,", bundleID, *state) +func (w *workerStatusHandler) activeProcessBundleStates(statusInfo *strings.Builder) { + statusInfo.WriteString("\n============Active Process Bundle States============\n") + w.metStoreToString(statusInfo) +} - } - states += fmt.Sprintf("\nBundle ID: %v\nBundle State: %#v\nBundle Execution States: %v\n", bundleID, *store.BundleState(), execStates) - } - return states +func (w *workerStatusHandler) cacheStats(statusInfo *strings.Builder) { + statusInfo.WriteString("\n============Cache Stats============\n") + statusInfo.WriteString(fmt.Sprintf("State Cache:\n%+v\n", w.cache.CacheMetrics())) } -func (w *workerStatusHandler) cacheStats() string { - return fmt.Sprintf("Cache:\n%v", w.cache.CacheMetrics()) +func goroutineDump(statusInfo *strings.Builder) { + statusInfo.WriteString("\n============Goroutine Dump============\n") + profile := pprof.Lookup("goroutine") + if profile != nil { + profile.WriteTo(statusInfo, 1) + } } -func goroutineDump() string { - buf := make([]byte, 1<<16) - runtime.Stack(buf, true) - return string(buf) +func buildInfo(statusInfo *strings.Builder) { + statusInfo.WriteString("\n============Build Info============\n") + if info, ok := debug.ReadBuildInfo(); ok { + statusInfo.WriteString(info.String()) + } } // reader reads the WorkerStatusRequest from the stream and sends a processed WorkerStatusResponse to @@ -112,8 +121,14 @@ func (w *workerStatusHandler) reader(ctx context.Context, stub fnpb.BeamFnWorker } log.Debugf(ctx, "RECV-status: %v", req.GetId()) - statusInfo := fmt.Sprintf("\n============Memory Usage============\n%s\n============Active Process Bundle States============\n%s\n============Cache Stats============\n%s\n============Goroutine Dump============\n%s\n", memoryUsage(), w.activeProcessBundleStates(), w.cacheStats(), goroutineDump()) - response := &fnpb.WorkerStatusResponse{Id: req.GetId(), StatusInfo: statusInfo} + statusInfo := &strings.Builder{} + memoryUsage(statusInfo) + w.activeProcessBundleStates(statusInfo) + w.cacheStats(statusInfo) + goroutineDump(statusInfo) + buildInfo(statusInfo) + + response := &fnpb.WorkerStatusResponse{Id: req.GetId(), StatusInfo: statusInfo.String()} if err := stub.Send(response); err != nil && err != io.EOF { log.Errorf(ctx, "workerStatus.Writer: Failed to respond: %v", err) } diff --git a/sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go b/sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go index e0a6f7ef53e1..3c26b63ea34c 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go +++ b/sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go @@ -21,6 +21,7 @@ import ( "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache" "log" "net" + "strings" "testing" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" @@ -76,7 +77,9 @@ func TestSendStatusResponse(t *testing.T) { t.Fatalf("unable to start test server: %v", err) } - statusHandler := workerStatusHandler{conn: conn, cache: &statecache.SideInputCache{}} + statusHandler := workerStatusHandler{conn: conn, cache: &statecache.SideInputCache{}, metStoreToString: func(builder *strings.Builder) { + builder.WriteString("metStore metadata") + }} if err := statusHandler.start(ctx); err != nil { t.Fatal(err) } diff --git a/sdks/go/test/integration/wordcount/wordcount.go b/sdks/go/test/integration/wordcount/wordcount.go index 0e7215f4c0f8..b73c0334394b 100644 --- a/sdks/go/test/integration/wordcount/wordcount.go +++ b/sdks/go/test/integration/wordcount/wordcount.go @@ -19,12 +19,13 @@ package wordcount import ( "context" "fmt" + "regexp" + "strings" + "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert" "github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats" - "regexp" - "strings" ) var ( From 976645397a0cebfc70f09e89e994896ce383ebec Mon Sep 17 00:00:00 2001 From: riteshghorse Date: Mon, 13 Jun 2022 10:42:48 -0400 Subject: [PATCH 6/6] rm blank line --- sdks/go/test/integration/wordcount/wordcount.go | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/go/test/integration/wordcount/wordcount.go b/sdks/go/test/integration/wordcount/wordcount.go index b73c0334394b..483f50285090 100644 --- a/sdks/go/test/integration/wordcount/wordcount.go +++ b/sdks/go/test/integration/wordcount/wordcount.go @@ -80,6 +80,7 @@ func formatFn(w string, c int) string { // WordCount returns a self-validating wordcount pipeline. func WordCount(glob, hash string, size int) *beam.Pipeline { p, s := beam.NewPipelineWithRoot() + in := textio.Read(s, glob) WordCountFromPCol(s, in, hash, size) return p