diff --git a/Framework/Core/include/Framework/DriverInfo.h b/Framework/Core/include/Framework/DriverInfo.h index 530f5d2d4b17d..8cec4b70ff46c 100644 --- a/Framework/Core/include/Framework/DriverInfo.h +++ b/Framework/Core/include/Framework/DriverInfo.h @@ -143,7 +143,9 @@ struct DriverInfo { /// The unique id used for ipc communications std::string uniqueWorkflowId = ""; /// Metrics gathering interval - unsigned short resourcesMonitoringInterval; + unsigned short resourcesMonitoringInterval = 0; + /// Metrics gathering dump to disk interval + unsigned short resourcesMonitoringDumpInterval = 0; /// Port used by the websocket control. 0 means not initialised. unsigned short port = 0; /// Last port used for tracy diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 08d6dc3b446d5..c53b181d79385 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -1195,6 +1195,20 @@ void single_step_callback(uv_timer_s* ctx) killChildren(*infos, SIGUSR1); } +void dumpMetricsCallback(uv_timer_t* handle) +{ + DriverServerContext* context = (DriverServerContext*)handle->data; + + auto performanceMetrics = o2::monitoring::ProcessMonitor::getAvailableMetricsNames(); + performanceMetrics.push_back("arrow-bytes-delta"); + performanceMetrics.push_back("aod-bytes-read-uncompressed"); + performanceMetrics.push_back("aod-bytes-read-compressed"); + performanceMetrics.push_back("aod-file-read-info"); + performanceMetrics.push_back("table-bytes-.*"); + ResourcesMonitoringHelper::dumpMetricsToJSON(*(context->metrics), + context->driver->metrics, *(context->specs), performanceMetrics); +} + // This is the handler for the parent inner loop. int runStateMachine(DataProcessorSpecs const& workflow, WorkflowInfo const& workflowInfo, @@ -1352,6 +1366,9 @@ int runStateMachine(DataProcessorSpecs const& workflow, bool guiDeployedOnce = false; bool once = false; + uv_timer_t metricDumpTimer; + metricDumpTimer.data = &serverContext; + while (true) { // If control forced some transition on us, we push it to the queue. if (driverControl.forcedTransitions.empty() == false) { @@ -1570,6 +1587,7 @@ int runStateMachine(DataProcessorSpecs const& workflow, "--fairmq-ipc-prefix", "--readers", "--resources-monitoring", + "--resources-monitoring-dump-interval", "--time-limit", }; @@ -1642,6 +1660,15 @@ int runStateMachine(DataProcessorSpecs const& workflow, callback(serviceRegistry, varmap); } assert(infos.empty() == false); + + // In case resource monitoring is requested, we dump metrics to disk + // every 3 minutes. + if (driverInfo.resourcesMonitoringDumpInterval && ResourcesMonitoringHelper::isResourcesMonitoringEnabled(driverInfo.resourcesMonitoringInterval)) { + uv_timer_init(loop, &metricDumpTimer); + uv_timer_start(&metricDumpTimer, dumpMetricsCallback, + driverInfo.resourcesMonitoringDumpInterval * 1000, + driverInfo.resourcesMonitoringDumpInterval * 1000); + } LOG(INFO) << "Redeployment of configuration done."; } break; case DriverState::RUNNING: @@ -1757,14 +1784,11 @@ int runStateMachine(DataProcessorSpecs const& workflow, } break; case DriverState::EXIT: { if (ResourcesMonitoringHelper::isResourcesMonitoringEnabled(driverInfo.resourcesMonitoringInterval)) { + if (driverInfo.resourcesMonitoringDumpInterval) { + uv_timer_stop(&metricDumpTimer); + } LOG(INFO) << "Dumping performance metrics to performanceMetrics.json file"; - auto performanceMetrics = o2::monitoring::ProcessMonitor::getAvailableMetricsNames(); - performanceMetrics.push_back("arrow-bytes-delta"); - performanceMetrics.push_back("aod-bytes-read-uncompressed"); - performanceMetrics.push_back("aod-bytes-read-compressed"); - performanceMetrics.push_back("aod-file-read-info"); - performanceMetrics.push_back("table-bytes-.*"); - ResourcesMonitoringHelper::dumpMetricsToJSON(metricsInfos, driverInfo.metrics, runningWorkflow.devices, performanceMetrics); + dumpMetricsCallback(&metricDumpTimer); } // This is a clean exit. Before we do so, if required, // we dump the configuration of all the devices so that @@ -2157,33 +2181,34 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, enum LogParsingHelpers::LogLevel minFailureLevel; bpo::options_description executorOptions("Executor options"); const char* helpDescription = "print help: short, full, executor, or processor name"; - executorOptions.add_options() // - ("help,h", bpo::value()->implicit_value("short"), helpDescription) // // - ("quiet,q", bpo::value()->zero_tokens()->default_value(false), "quiet operation") // // - ("stop,s", bpo::value()->zero_tokens()->default_value(false), "stop before device start") // // - ("single-step", bpo::value()->zero_tokens()->default_value(false), "start in single step mode") // // - ("batch,b", bpo::value()->zero_tokens()->default_value(isatty(fileno(stdout)) == 0), "batch processing mode") // // - ("no-batch", bpo::value()->zero_tokens()->default_value(false), "force gui processing mode") // // - ("no-cleanup", bpo::value()->zero_tokens()->default_value(false), "do not cleanup the shm segment") // // - ("hostname", bpo::value()->default_value("localhost"), "hostname to deploy") // // - ("resources", bpo::value()->default_value(""), "resources allocated for the workflow") // // - ("start-port,p", bpo::value()->default_value(22000), "start port to allocate") // // - ("port-range,pr", bpo::value()->default_value(1000), "ports in range") // // - ("completion-policy,c", bpo::value(&policy)->default_value(TerminationPolicy::QUIT), // // - "what to do when processing is finished: quit, wait") // // - ("error-policy", bpo::value(&errorPolicy)->default_value(TerminationPolicy::QUIT), // // - "what to do when a device has an error: quit, wait") // // - ("min-failure-level", bpo::value(&minFailureLevel)->default_value(LogParsingHelpers::LogLevel::Fatal), // // - "minimum message level which will be considered as fatal and exit with 1") // // - ("graphviz,g", bpo::value()->zero_tokens()->default_value(false), "produce graph output") // // - ("timeout,t", bpo::value()->default_value(0), "forced exit timeout (in seconds)") // // - ("dds,D", bpo::value()->zero_tokens()->default_value(false), "create DDS configuration") // // - ("dump-workflow,dump", bpo::value()->zero_tokens()->default_value(false), "dump workflow as JSON") // // - ("dump-workflow-file", bpo::value()->default_value("-"), "file to which do the dump") // // - ("run", bpo::value()->zero_tokens()->default_value(false), "run workflow merged so far") // // - ("no-IPC", bpo::value()->zero_tokens()->default_value(false), "disable IPC topology optimization") // // - ("o2-control,o2", bpo::value()->default_value(""), "dump O2 Control workflow configuration under the specified name") // - ("resources-monitoring", bpo::value()->default_value(0), "enable cpu/memory monitoring for provided interval in seconds"); // + executorOptions.add_options() // + ("help,h", bpo::value()->implicit_value("short"), helpDescription) // // + ("quiet,q", bpo::value()->zero_tokens()->default_value(false), "quiet operation") // // + ("stop,s", bpo::value()->zero_tokens()->default_value(false), "stop before device start") // // + ("single-step", bpo::value()->zero_tokens()->default_value(false), "start in single step mode") // // + ("batch,b", bpo::value()->zero_tokens()->default_value(isatty(fileno(stdout)) == 0), "batch processing mode") // // + ("no-batch", bpo::value()->zero_tokens()->default_value(false), "force gui processing mode") // // + ("no-cleanup", bpo::value()->zero_tokens()->default_value(false), "do not cleanup the shm segment") // // + ("hostname", bpo::value()->default_value("localhost"), "hostname to deploy") // // + ("resources", bpo::value()->default_value(""), "resources allocated for the workflow") // // + ("start-port,p", bpo::value()->default_value(22000), "start port to allocate") // // + ("port-range,pr", bpo::value()->default_value(1000), "ports in range") // // + ("completion-policy,c", bpo::value(&policy)->default_value(TerminationPolicy::QUIT), // // + "what to do when processing is finished: quit, wait") // // + ("error-policy", bpo::value(&errorPolicy)->default_value(TerminationPolicy::QUIT), // // + "what to do when a device has an error: quit, wait") // // + ("min-failure-level", bpo::value(&minFailureLevel)->default_value(LogParsingHelpers::LogLevel::Fatal), // // + "minimum message level which will be considered as fatal and exit with 1") // // + ("graphviz,g", bpo::value()->zero_tokens()->default_value(false), "produce graph output") // // + ("timeout,t", bpo::value()->default_value(0), "forced exit timeout (in seconds)") // // + ("dds,D", bpo::value()->zero_tokens()->default_value(false), "create DDS configuration") // // + ("dump-workflow,dump", bpo::value()->zero_tokens()->default_value(false), "dump workflow as JSON") // // + ("dump-workflow-file", bpo::value()->default_value("-"), "file to which do the dump") // // + ("run", bpo::value()->zero_tokens()->default_value(false), "run workflow merged so far") // // + ("no-IPC", bpo::value()->zero_tokens()->default_value(false), "disable IPC topology optimization") // // + ("o2-control,o2", bpo::value()->default_value(""), "dump O2 Control workflow configuration under the specified name") // + ("resources-monitoring", bpo::value()->default_value(0), "enable cpu/memory monitoring for provided interval in seconds") // + ("resources-monitoring-dump-interval", bpo::value()->default_value(0), "dump monitoring information to disk every provided seconds"); // // some of the options must be forwarded by default to the device executorOptions.add(DeviceSpecHelpers::getForwardedDeviceOptions()); @@ -2401,6 +2426,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow, driverInfo.deployHostname = varmap["hostname"].as(); driverInfo.resources = varmap["resources"].as(); driverInfo.resourcesMonitoringInterval = varmap["resources-monitoring"].as(); + driverInfo.resourcesMonitoringDumpInterval = varmap["resources-monitoring-dump-interval"].as(); // FIXME: should use the whole dataProcessorInfos, actually... driverInfo.processorInfo = dataProcessorInfos;