Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Framework/Core/include/Framework/DriverInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ struct DriverInfo {
/// The unique id used for ipc communications
std::string uniqueWorkflowId = "";
/// Metrics gathering interval
unsigned short resourcesMonitoringInterval;
unsigned short resourcesMonitoringInterval = 0;
/// Metrics gathering dump to disk interval
unsigned short resourcesMonitoringDumpInterval = 0;
/// Port used by the websocket control. 0 means not initialised.
unsigned short port = 0;
/// Last port used for tracy
Expand Down
94 changes: 60 additions & 34 deletions Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1195,6 +1195,20 @@ void single_step_callback(uv_timer_s* ctx)
killChildren(*infos, SIGUSR1);
}

void dumpMetricsCallback(uv_timer_t* handle)
{
DriverServerContext* context = (DriverServerContext*)handle->data;

auto performanceMetrics = o2::monitoring::ProcessMonitor::getAvailableMetricsNames();
performanceMetrics.push_back("arrow-bytes-delta");
performanceMetrics.push_back("aod-bytes-read-uncompressed");
performanceMetrics.push_back("aod-bytes-read-compressed");
performanceMetrics.push_back("aod-file-read-info");
performanceMetrics.push_back("table-bytes-.*");
ResourcesMonitoringHelper::dumpMetricsToJSON(*(context->metrics),
context->driver->metrics, *(context->specs), performanceMetrics);
}

// This is the handler for the parent inner loop.
int runStateMachine(DataProcessorSpecs const& workflow,
WorkflowInfo const& workflowInfo,
Expand Down Expand Up @@ -1352,6 +1366,9 @@ int runStateMachine(DataProcessorSpecs const& workflow,
bool guiDeployedOnce = false;
bool once = false;

uv_timer_t metricDumpTimer;
metricDumpTimer.data = &serverContext;

while (true) {
// If control forced some transition on us, we push it to the queue.
if (driverControl.forcedTransitions.empty() == false) {
Expand Down Expand Up @@ -1570,6 +1587,7 @@ int runStateMachine(DataProcessorSpecs const& workflow,
"--fairmq-ipc-prefix",
"--readers",
"--resources-monitoring",
"--resources-monitoring-dump-interval",
"--time-limit",
};

Expand Down Expand Up @@ -1642,6 +1660,15 @@ int runStateMachine(DataProcessorSpecs const& workflow,
callback(serviceRegistry, varmap);
}
assert(infos.empty() == false);

// In case resource monitoring is requested, we dump metrics to disk
// every 3 minutes.
if (driverInfo.resourcesMonitoringDumpInterval && ResourcesMonitoringHelper::isResourcesMonitoringEnabled(driverInfo.resourcesMonitoringInterval)) {
uv_timer_init(loop, &metricDumpTimer);
uv_timer_start(&metricDumpTimer, dumpMetricsCallback,
driverInfo.resourcesMonitoringDumpInterval * 1000,
driverInfo.resourcesMonitoringDumpInterval * 1000);
}
LOG(INFO) << "Redeployment of configuration done.";
} break;
case DriverState::RUNNING:
Expand Down Expand Up @@ -1757,14 +1784,11 @@ int runStateMachine(DataProcessorSpecs const& workflow,
} break;
case DriverState::EXIT: {
if (ResourcesMonitoringHelper::isResourcesMonitoringEnabled(driverInfo.resourcesMonitoringInterval)) {
if (driverInfo.resourcesMonitoringDumpInterval) {
uv_timer_stop(&metricDumpTimer);
}
LOG(INFO) << "Dumping performance metrics to performanceMetrics.json file";
auto performanceMetrics = o2::monitoring::ProcessMonitor::getAvailableMetricsNames();
performanceMetrics.push_back("arrow-bytes-delta");
performanceMetrics.push_back("aod-bytes-read-uncompressed");
performanceMetrics.push_back("aod-bytes-read-compressed");
performanceMetrics.push_back("aod-file-read-info");
performanceMetrics.push_back("table-bytes-.*");
ResourcesMonitoringHelper::dumpMetricsToJSON(metricsInfos, driverInfo.metrics, runningWorkflow.devices, performanceMetrics);
dumpMetricsCallback(&metricDumpTimer);
}
// This is a clean exit. Before we do so, if required,
// we dump the configuration of all the devices so that
Expand Down Expand Up @@ -2157,33 +2181,34 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
enum LogParsingHelpers::LogLevel minFailureLevel;
bpo::options_description executorOptions("Executor options");
const char* helpDescription = "print help: short, full, executor, or processor name";
executorOptions.add_options() //
("help,h", bpo::value<std::string>()->implicit_value("short"), helpDescription) // //
("quiet,q", bpo::value<bool>()->zero_tokens()->default_value(false), "quiet operation") // //
("stop,s", bpo::value<bool>()->zero_tokens()->default_value(false), "stop before device start") // //
("single-step", bpo::value<bool>()->zero_tokens()->default_value(false), "start in single step mode") // //
("batch,b", bpo::value<bool>()->zero_tokens()->default_value(isatty(fileno(stdout)) == 0), "batch processing mode") // //
("no-batch", bpo::value<bool>()->zero_tokens()->default_value(false), "force gui processing mode") // //
("no-cleanup", bpo::value<bool>()->zero_tokens()->default_value(false), "do not cleanup the shm segment") // //
("hostname", bpo::value<std::string>()->default_value("localhost"), "hostname to deploy") // //
("resources", bpo::value<std::string>()->default_value(""), "resources allocated for the workflow") // //
("start-port,p", bpo::value<unsigned short>()->default_value(22000), "start port to allocate") // //
("port-range,pr", bpo::value<unsigned short>()->default_value(1000), "ports in range") // //
("completion-policy,c", bpo::value<TerminationPolicy>(&policy)->default_value(TerminationPolicy::QUIT), // //
"what to do when processing is finished: quit, wait") // //
("error-policy", bpo::value<TerminationPolicy>(&errorPolicy)->default_value(TerminationPolicy::QUIT), // //
"what to do when a device has an error: quit, wait") // //
("min-failure-level", bpo::value<LogParsingHelpers::LogLevel>(&minFailureLevel)->default_value(LogParsingHelpers::LogLevel::Fatal), // //
"minimum message level which will be considered as fatal and exit with 1") // //
("graphviz,g", bpo::value<bool>()->zero_tokens()->default_value(false), "produce graph output") // //
("timeout,t", bpo::value<uint64_t>()->default_value(0), "forced exit timeout (in seconds)") // //
("dds,D", bpo::value<bool>()->zero_tokens()->default_value(false), "create DDS configuration") // //
("dump-workflow,dump", bpo::value<bool>()->zero_tokens()->default_value(false), "dump workflow as JSON") // //
("dump-workflow-file", bpo::value<std::string>()->default_value("-"), "file to which do the dump") // //
("run", bpo::value<bool>()->zero_tokens()->default_value(false), "run workflow merged so far") // //
("no-IPC", bpo::value<bool>()->zero_tokens()->default_value(false), "disable IPC topology optimization") // //
("o2-control,o2", bpo::value<std::string>()->default_value(""), "dump O2 Control workflow configuration under the specified name") //
("resources-monitoring", bpo::value<unsigned short>()->default_value(0), "enable cpu/memory monitoring for provided interval in seconds"); //
executorOptions.add_options() //
("help,h", bpo::value<std::string>()->implicit_value("short"), helpDescription) // //
("quiet,q", bpo::value<bool>()->zero_tokens()->default_value(false), "quiet operation") // //
("stop,s", bpo::value<bool>()->zero_tokens()->default_value(false), "stop before device start") // //
("single-step", bpo::value<bool>()->zero_tokens()->default_value(false), "start in single step mode") // //
("batch,b", bpo::value<bool>()->zero_tokens()->default_value(isatty(fileno(stdout)) == 0), "batch processing mode") // //
("no-batch", bpo::value<bool>()->zero_tokens()->default_value(false), "force gui processing mode") // //
("no-cleanup", bpo::value<bool>()->zero_tokens()->default_value(false), "do not cleanup the shm segment") // //
("hostname", bpo::value<std::string>()->default_value("localhost"), "hostname to deploy") // //
("resources", bpo::value<std::string>()->default_value(""), "resources allocated for the workflow") // //
("start-port,p", bpo::value<unsigned short>()->default_value(22000), "start port to allocate") // //
("port-range,pr", bpo::value<unsigned short>()->default_value(1000), "ports in range") // //
("completion-policy,c", bpo::value<TerminationPolicy>(&policy)->default_value(TerminationPolicy::QUIT), // //
"what to do when processing is finished: quit, wait") // //
("error-policy", bpo::value<TerminationPolicy>(&errorPolicy)->default_value(TerminationPolicy::QUIT), // //
"what to do when a device has an error: quit, wait") // //
("min-failure-level", bpo::value<LogParsingHelpers::LogLevel>(&minFailureLevel)->default_value(LogParsingHelpers::LogLevel::Fatal), // //
"minimum message level which will be considered as fatal and exit with 1") // //
("graphviz,g", bpo::value<bool>()->zero_tokens()->default_value(false), "produce graph output") // //
("timeout,t", bpo::value<uint64_t>()->default_value(0), "forced exit timeout (in seconds)") // //
("dds,D", bpo::value<bool>()->zero_tokens()->default_value(false), "create DDS configuration") // //
("dump-workflow,dump", bpo::value<bool>()->zero_tokens()->default_value(false), "dump workflow as JSON") // //
("dump-workflow-file", bpo::value<std::string>()->default_value("-"), "file to which do the dump") // //
("run", bpo::value<bool>()->zero_tokens()->default_value(false), "run workflow merged so far") // //
("no-IPC", bpo::value<bool>()->zero_tokens()->default_value(false), "disable IPC topology optimization") // //
("o2-control,o2", bpo::value<std::string>()->default_value(""), "dump O2 Control workflow configuration under the specified name") //
("resources-monitoring", bpo::value<unsigned short>()->default_value(0), "enable cpu/memory monitoring for provided interval in seconds") //
("resources-monitoring-dump-interval", bpo::value<unsigned short>()->default_value(0), "dump monitoring information to disk every provided seconds"); //
// some of the options must be forwarded by default to the device
executorOptions.add(DeviceSpecHelpers::getForwardedDeviceOptions());

Expand Down Expand Up @@ -2401,6 +2426,7 @@ int doMain(int argc, char** argv, o2::framework::WorkflowSpec const& workflow,
driverInfo.deployHostname = varmap["hostname"].as<std::string>();
driverInfo.resources = varmap["resources"].as<std::string>();
driverInfo.resourcesMonitoringInterval = varmap["resources-monitoring"].as<unsigned short>();
driverInfo.resourcesMonitoringDumpInterval = varmap["resources-monitoring-dump-interval"].as<unsigned short>();

// FIXME: should use the whole dataProcessorInfos, actually...
driverInfo.processorInfo = dataProcessorInfos;
Expand Down