diff --git a/sdks/go/container/boot.go b/sdks/go/container/boot.go index b180f54bf182..7577f1488453 100644 --- a/sdks/go/container/boot.go +++ b/sdks/go/container/boot.go @@ -108,7 +108,7 @@ func main() { "--options=" + options, } if info.GetStatusEndpoint() != nil { - args = append(args, "--status_endpoint="+info.GetStatusEndpoint().GetUrl()) + os.Setenv("STATUS_ENDPOINT", info.GetStatusEndpoint().GetUrl()) } if len(info.GetRunnerCapabilities()) > 0 { diff --git a/sdks/go/pkg/beam/core/metrics/store.go b/sdks/go/pkg/beam/core/metrics/store.go index 7e2240a188ec..2608b4792dee 100644 --- a/sdks/go/pkg/beam/core/metrics/store.go +++ b/sdks/go/pkg/beam/core/metrics/store.go @@ -17,8 +17,11 @@ package metrics import ( "fmt" + "strings" "sync" + "sync/atomic" "time" + "unsafe" ) // Implementation note: We avoid depending on the FnAPI protos here @@ -156,6 +159,22 @@ type ptCounterSet struct { type bundleProcState int +// String implements the Stringer interface. +func (b bundleProcState) String() string { + switch b { + case StartBundle: + return "START_BUNDLE" + case ProcessBundle: + return "PROCESS_BUNDLE" + case FinishBundle: + return "FINISH_BUNDLE" + case TotalBundle: + return "TOTAL_BUNDLE" + default: + return "unknown process bundle state!" + } +} + const ( // StartBundle indicates starting state of a bundle StartBundle bundleProcState = 0 @@ -174,12 +193,22 @@ type ExecutionState struct { TotalTime time.Duration } +// String implements the Stringer interface. +func (e ExecutionState) String() string { + return fmt.Sprintf("Execution State:\n\t State: %s\n\t IsProcessing: %v\n\t Total time: %v\n", e.State, e.IsProcessing, e.TotalTime) +} + // BundleState stores information about a PTransform for execution time metrics. type BundleState struct { pid string currentState bundleProcState } +// String implements the Stringer interface. +func (b BundleState) String() string { + return fmt.Sprintf("Bundle State:\n\t PTransform ID: %s\n\t Current state: %s", b.pid, b.currentState) +} + // currentStateVal exports the current state of a bundle wrt PTransform. type currentStateVal struct { pid string @@ -218,3 +247,24 @@ func (b *Store) storeMetric(pid string, n name, m userMetric) { } b.store[l] = m } + +// BundleState returns the bundle state. +func (b *Store) BundleState() string { + bs := *(*BundleState)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&b.bundleState)))) + return bs.String() +} + +// StateRegistry returns the state registry that stores bundleID to executions states mapping. +func (b *Store) StateRegistry() string { + b.mu.Lock() + defer b.mu.Unlock() + builder := &strings.Builder{} + builder.WriteString("\n | All Bundle Process States | \n") + for bundleID, state := range b.stateRegistry { + builder.WriteString(fmt.Sprintf("\tBundle ID: %s\n", bundleID)) + for i := 0; i < 4; i++ { + builder.WriteString(fmt.Sprintf("\t%s\n", state[i])) + } + } + return builder.String() +} diff --git a/sdks/go/pkg/beam/core/runtime/harness/harness.go b/sdks/go/pkg/beam/core/runtime/harness/harness.go index 4ed4cf6d4b23..5741ae81e7fe 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/harness.go +++ b/sdks/go/pkg/beam/core/runtime/harness/harness.go @@ -39,9 +39,6 @@ import ( "google.golang.org/protobuf/types/known/durationpb" ) -// StatusAddress is a type of status endpoint address as an optional argument to harness.Main(). -type StatusAddress string - // URNMonitoringInfoShortID is a URN indicating support for short monitoring info IDs. const URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1" @@ -50,22 +47,14 @@ const URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1" // Main is the main entrypoint for the Go harness. It runs at "runtime" -- not // "pipeline-construction time" -- on each worker. It is a FnAPI client and // ultimately responsible for correctly executing user code. -func Main(ctx context.Context, loggingEndpoint, controlEndpoint string, options ...interface{}) error { +func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error { hooks.DeserializeHooksFromOptions(ctx) - statusEndpoint := "" - for _, option := range options { - switch option := option.(type) { - case StatusAddress: - statusEndpoint = string(option) - default: - return errors.Errorf("unknown type %T, value %v in error call", option, option) - } - } - // Extract environment variables. These are optional runner supported capabilities. // Expected env variables: // RUNNER_CAPABILITIES : list of runner supported capability urn. + // STATUS_ENDPOINT : Endpoint to connect to status server used for worker status reporting. + statusEndpoint := os.Getenv("STATUS_ENDPOINT") runnerCapabilities := strings.Split(os.Getenv("RUNNER_CAPABILITIES"), " ") rcMap := make(map[string]bool) if len(runnerCapabilities) > 0 { @@ -128,18 +117,6 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string, options log.Debugf(ctx, "control response channel closed") }() - // if the runner supports worker status api then expose SDK harness status - if statusEndpoint != "" { - statusHandler, err := newWorkerStatusHandler(ctx, statusEndpoint) - if err != nil { - log.Errorf(ctx, "error establishing connection to worker status API: %v", err) - } else { - if err := statusHandler.start(ctx); err == nil { - defer statusHandler.stop(ctx) - } - } - } - sideCache := statecache.SideInputCache{} sideCache.Init(cacheSize) @@ -157,6 +134,19 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string, options cache: &sideCache, runnerCapabilities: rcMap, } + + // if the runner supports worker status api then expose SDK harness status + if statusEndpoint != "" { + statusHandler, err := newWorkerStatusHandler(ctx, statusEndpoint, ctrl.cache, func(statusInfo *strings.Builder) { ctrl.metStoreToString(statusInfo) }) + if err != nil { + log.Errorf(ctx, "error establishing connection to worker status API: %v", err) + } else { + if err := statusHandler.start(ctx); err == nil { + defer statusHandler.stop(ctx) + } + } + } + // gRPC requires all readers of a stream be the same goroutine, so this goroutine // is responsible for managing the network data. All it does is pull data from // the stream, and hand off the message to a goroutine to actually be handled, @@ -296,6 +286,16 @@ type control struct { runnerCapabilities map[string]bool } +func (c *control) metStoreToString(statusInfo *strings.Builder) { + c.mu.Lock() + defer c.mu.Unlock() + for bundleID, store := range c.metStore { + statusInfo.WriteString(fmt.Sprintf("Bundle ID: %v\n", bundleID)) + statusInfo.WriteString(fmt.Sprintf("\t%s", store.BundleState())) + statusInfo.WriteString(fmt.Sprintf("\t%s", store.StateRegistry())) + } +} + func (c *control) getOrCreatePlan(bdID bundleDescriptorID) (*exec.Plan, error) { c.mu.Lock() plans, ok := c.plans[bdID] diff --git a/sdks/go/pkg/beam/core/runtime/harness/init/init.go b/sdks/go/pkg/beam/core/runtime/harness/init/init.go index fb9e1321edbd..3f8e2336f5be 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/init/init.go +++ b/sdks/go/pkg/beam/core/runtime/harness/init/init.go @@ -42,7 +42,6 @@ var ( id = flag.String("id", "", "Local identifier (required in worker mode).") loggingEndpoint = flag.String("logging_endpoint", "", "Local logging gRPC endpoint (required in worker mode).") controlEndpoint = flag.String("control_endpoint", "", "Local control gRPC endpoint (required in worker mode).") - statusEndpoint = flag.String("status_endpoint", "", "Local status gRPC endpoint (optional in worker mode).") //lint:ignore U1000 semiPersistDir flag is passed in through the boot container, will need to be removed later semiPersistDir = flag.String("semi_persist_dir", "/tmp", "Local semi-persistent directory (optional in worker mode).") options = flag.String("options", "", "JSON-encoded pipeline options (required in worker mode).") @@ -108,7 +107,7 @@ func hook() { // does, and establish the background context here. ctx := grpcx.WriteWorkerID(context.Background(), *id) - if err := harness.Main(ctx, *loggingEndpoint, *controlEndpoint, harness.StatusAddress(*statusEndpoint)); err != nil { + if err := harness.Main(ctx, *loggingEndpoint, *controlEndpoint); err != nil { fmt.Fprintf(os.Stderr, "Worker failed: %v\n", err) switch ShutdownMode { case Terminate: diff --git a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go index 73430869b2e9..cdbf54ef77d3 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go +++ b/sdks/go/pkg/beam/core/runtime/harness/statecache/statecache.go @@ -272,3 +272,10 @@ func (c *SideInputCache) evictElement(ctx context.Context) { } } } + +// CacheMetrics returns the cache metrics for current side input cache. +func (c *SideInputCache) CacheMetrics() CacheMetrics { + c.mu.Lock() + defer c.mu.Unlock() + return c.metrics +} diff --git a/sdks/go/pkg/beam/core/runtime/harness/worker_status.go b/sdks/go/pkg/beam/core/runtime/harness/worker_status.go index bf5a9a9099b1..e4ce4b7e71a8 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/worker_status.go +++ b/sdks/go/pkg/beam/core/runtime/harness/worker_status.go @@ -17,8 +17,13 @@ package harness import ( "context" + "fmt" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache" "io" "runtime" + "runtime/debug" + "runtime/pprof" + "strings" "sync" "sync/atomic" "time" @@ -31,17 +36,19 @@ import ( // workerStatusHandler stores the communication information of WorkerStatus API. type workerStatusHandler struct { - conn *grpc.ClientConn - shouldShutdown int32 - wg sync.WaitGroup + conn *grpc.ClientConn + shouldShutdown int32 + wg sync.WaitGroup + cache *statecache.SideInputCache + metStoreToString func(*strings.Builder) } -func newWorkerStatusHandler(ctx context.Context, endpoint string) (*workerStatusHandler, error) { +func newWorkerStatusHandler(ctx context.Context, endpoint string, cache *statecache.SideInputCache, metStoreToString func(*strings.Builder)) (*workerStatusHandler, error) { sconn, err := dial(ctx, endpoint, 60*time.Second) if err != nil { return nil, errors.Wrapf(err, "failed to connect: %v\n", endpoint) } - return &workerStatusHandler{conn: sconn, shouldShutdown: 0}, nil + return &workerStatusHandler{conn: sconn, shouldShutdown: 0, cache: cache, metStoreToString: metStoreToString}, nil } func (w *workerStatusHandler) isAlive() bool { @@ -65,11 +72,47 @@ func (w *workerStatusHandler) start(ctx context.Context) error { return nil } +func memoryUsage(statusInfo *strings.Builder) { + statusInfo.WriteString("\n============Memory Usage============\n") + m := runtime.MemStats{} + runtime.ReadMemStats(&m) + statusInfo.WriteString(fmt.Sprintf("heap in-use-spans/allocated/total/max = %d/%d/%d/%d MB\n", m.HeapInuse>>20, m.HeapAlloc>>20, m.TotalAlloc>>20, m.HeapSys>>20)) + statusInfo.WriteString(fmt.Sprintf("stack in-use-spans/max = %d/%d MB\n", m.StackInuse>>20, m.StackSys>>20)) + statusInfo.WriteString(fmt.Sprintf("GC-CPU percentage = %.2f %%\n", m.GCCPUFraction*100)) + statusInfo.WriteString(fmt.Sprintf("Last GC time: %v\n", time.Unix(0, int64(m.LastGC)))) + statusInfo.WriteString(fmt.Sprintf("Next GC: %v MB\n", m.NextGC>>20)) +} + +func (w *workerStatusHandler) activeProcessBundleStates(statusInfo *strings.Builder) { + statusInfo.WriteString("\n============Active Process Bundle States============\n") + w.metStoreToString(statusInfo) +} + +func (w *workerStatusHandler) cacheStats(statusInfo *strings.Builder) { + statusInfo.WriteString("\n============Cache Stats============\n") + statusInfo.WriteString(fmt.Sprintf("State Cache:\n%+v\n", w.cache.CacheMetrics())) +} + +func goroutineDump(statusInfo *strings.Builder) { + statusInfo.WriteString("\n============Goroutine Dump============\n") + profile := pprof.Lookup("goroutine") + if profile != nil { + profile.WriteTo(statusInfo, 1) + } +} + +func buildInfo(statusInfo *strings.Builder) { + statusInfo.WriteString("\n============Build Info============\n") + if info, ok := debug.ReadBuildInfo(); ok { + statusInfo.WriteString(info.String()) + } +} + // reader reads the WorkerStatusRequest from the stream and sends a processed WorkerStatusResponse to // a response channel. func (w *workerStatusHandler) reader(ctx context.Context, stub fnpb.BeamFnWorkerStatus_WorkerStatusClient) { defer w.wg.Done() - buf := make([]byte, 1<<16) + for w.isAlive() { req, err := stub.Recv() if err != nil && err != io.EOF { @@ -77,8 +120,15 @@ func (w *workerStatusHandler) reader(ctx context.Context, stub fnpb.BeamFnWorker return } log.Debugf(ctx, "RECV-status: %v", req.GetId()) - runtime.Stack(buf, true) - response := &fnpb.WorkerStatusResponse{Id: req.GetId(), StatusInfo: string(buf)} + + statusInfo := &strings.Builder{} + memoryUsage(statusInfo) + w.activeProcessBundleStates(statusInfo) + w.cacheStats(statusInfo) + goroutineDump(statusInfo) + buildInfo(statusInfo) + + response := &fnpb.WorkerStatusResponse{Id: req.GetId(), StatusInfo: statusInfo.String()} if err := stub.Send(response); err != nil && err != io.EOF { log.Errorf(ctx, "workerStatus.Writer: Failed to respond: %v", err) } diff --git a/sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go b/sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go index 6fc6ab0ca91d..3c26b63ea34c 100644 --- a/sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go +++ b/sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go @@ -18,8 +18,10 @@ package harness import ( "context" "fmt" + "github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache" "log" "net" + "strings" "testing" fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1" @@ -75,7 +77,9 @@ func TestSendStatusResponse(t *testing.T) { t.Fatalf("unable to start test server: %v", err) } - statusHandler := workerStatusHandler{conn: conn} + statusHandler := workerStatusHandler{conn: conn, cache: &statecache.SideInputCache{}, metStoreToString: func(builder *strings.Builder) { + builder.WriteString("metStore metadata") + }} if err := statusHandler.start(ctx); err != nil { t.Fatal(err) } diff --git a/sdks/go/test/integration/wordcount/wordcount.go b/sdks/go/test/integration/wordcount/wordcount.go index ee18cf688943..483f50285090 100644 --- a/sdks/go/test/integration/wordcount/wordcount.go +++ b/sdks/go/test/integration/wordcount/wordcount.go @@ -18,11 +18,10 @@ package wordcount import ( "context" + "fmt" "regexp" "strings" - "fmt" - "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio" "github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"