Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ release.
</tr>
<tr>
<td valign="top">
<b><a href="doc/changelogs/CHANGELOG_V22.md#22.15.0">22.15.0</a></b><br/>
<b><a href="doc/changelogs/CHANGELOG_V22.md#22.15.1">22.15.1</a></b><br/>
<a href="doc/changelogs/CHANGELOG_V22.md#22.15.0">22.15.0</a><br/>
<a href="doc/changelogs/CHANGELOG_V22.md#22.14.0">22.14.0</a><br/>
<a href="doc/changelogs/CHANGELOG_V22.md#22.13.1">22.13.1</a><br/>
<a href="doc/changelogs/CHANGELOG_V22.md#22.13.0">22.13.0</a><br/>
Expand Down
4 changes: 1 addition & 3 deletions agents/grpc/proto/asset.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,5 @@ message Asset {
google.protobuf.Struct metadata = 3;
string data = 4;
bool complete = 5;
uint64 duration = 6; // Duration of the profile in milliseconds
double start_ts = 7; // Timestamp when the profile was taken in milliseconds from epoch
double end_ts = 8; // Timestamp when the profile was completed in milliseconds from epoch
uint64 duration = 6;
}
1 change: 0 additions & 1 deletion agents/grpc/proto/nsolid_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package grpcagent;
service NSolidService {
rpc Command (stream CommandResponse) returns (stream CommandRequest) {}
rpc ExportAsset (stream Asset) returns (EventResponse) {}
rpc ExportContinuousProfile (stream Asset) returns (EventResponse) {}
rpc ExportExit (ExitEvent) returns (EventResponse) {}
rpc ExportInfo (InfoEvent) returns (EventResponse) {}
rpc ExportMetrics (MetricsEvent) returns (EventResponse) {}
Expand Down
1 change: 0 additions & 1 deletion agents/grpc/proto/reconfigure.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ message ReconfigureBody {
repeated string tags = 9;
optional bool tracingEnabled = 10;
optional uint32 tracingModulesBlacklist = 11;
optional bool contCpuProfile = 12;
}

message ReconfigureEvent {
Expand Down
15 changes: 3 additions & 12 deletions agents/grpc/src/asset_stream.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,14 @@ AssetStream::AssetStream(
AssetStor&& stor,
std::weak_ptr<AssetStreamObserver> observer,
const std::string& agent_id,
const std::string& saas,
AssetStreamRpcType rpc_type): observer_(observer),
stor_(std::move(stor)) {
const std::string& saas): observer_(observer),
stor_(std::move(stor)) {
ASSERT_EQ(0, lock_.init(true));
context_.AddMetadata("nsolid-agent-id", agent_id);
if (!saas.empty()) {
context_.AddMetadata("nsolid-saas-token", saas);
}

// Call the appropriate RPC method based on the rpc_type parameter
if (rpc_type == EXPORT_CONTINUOUS_PROFILE) {
stub->async()->ExportContinuousProfile(&context_, &event_response_, this);
} else {
stub->async()->ExportAsset(&context_, &event_response_, this);
}

stub->async()->ExportAsset(&context_, &event_response_, this);
AddHold();
StartCall();
}
Expand Down Expand Up @@ -94,7 +86,6 @@ void AssetStream::Write(grpcagent::Asset&& asset) {

void AssetStream::WritesDone(bool) {
nsuv::ns_mutex::scoped_lock lock(lock_);
ASSERT(write_state_.write_done_called == false);
write_state_.write_done_called = true;
NextWrite();
}
Expand Down
9 changes: 1 addition & 8 deletions agents/grpc/src/asset_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@ namespace grpc {
// Predeclarations
class AssetStream;

// RPC type enum for specifying which RPC method to use
enum AssetStreamRpcType {
EXPORT_ASSET,
EXPORT_CONTINUOUS_PROFILE
};

struct AssetStor {
ProfileType type;
uint64_t thread_id;
Expand All @@ -47,8 +41,7 @@ class AssetStream: public ::grpc::ClientWriteReactor<grpcagent::Asset> {
AssetStor&& stor,
std::weak_ptr<AssetStreamObserver> observer,
const std::string& agent_id,
const std::string& saas,
AssetStreamRpcType rpc_type = EXPORT_ASSET);
const std::string& saas);

~AssetStream();

Expand Down
187 changes: 7 additions & 180 deletions agents/grpc/src/grpc_agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,6 @@ void PopulateReconfigureEvent(grpcagent::ReconfigureEvent* reconfigure_event,
if (it != config.end()) {
body->set_tracingmodulesblacklist(*it);
}
it = config.find("contCpuProfile");
if (it != config.end()) {
body->set_contcpuprofile(*it);
}
}

void PopulateStartupTimesEvent(grpcagent::StartupTimesEvent* st_events,
Expand Down Expand Up @@ -576,16 +572,12 @@ int GrpcAgent::stop(bool profile_stopped) {
} else {
Debug("Stopping gRPC Agent(2): %d\n", profile_stopped);
if (profile_stopped) {
auto it = config_.find("contCpuProfile");
if (it == config_.end() || it->get<bool>() == false) {
profile_on_exit_ = profile_stopped;
}

profile_on_exit_ = profile_stopped;
// Wait here until the are no remaining profiles to be completed
uv_mutex_lock(&stop_lock_);
while (pending_profiles()) {
do {
uv_cond_wait(&stop_cond_, &stop_lock_);
}
} while (pending_profiles());

uv_mutex_unlock(&stop_lock_);
}
Expand Down Expand Up @@ -809,17 +801,6 @@ int GrpcAgent::start_heap_snapshot_from_js(
}
}

/*static*/ void GrpcAgent::cont_profiler_cb(
const ProfileCollector::ProfileQStor& profile_data,
WeakGrpcAgent agent_wp) {
SharedGrpcAgent agent = agent_wp.lock();
if (agent == nullptr) {
return;
}

agent->cont_profile_queue_->enqueue(profile_data);
}

void GrpcAgent::env_creation_cb_(SharedEnvInst envinst,
WeakGrpcAgent agent_wp) {
SharedGrpcAgent agent = agent_wp.lock();
Expand Down Expand Up @@ -1245,22 +1226,6 @@ void GrpcAgent::do_start() {
},
weak_from_this());

cont_profile_queue_ =
AsyncTSQueue<ProfileCollector::ProfileQStor>::create(
&loop_,
+[](ProfileCollector::ProfileQStor&& stor, WeakGrpcAgent agent_wp) {
SharedGrpcAgent agent = agent_wp.lock();
if (agent == nullptr) {
return;
}

agent->got_continuous_profile(std::move(stor));
},
weak_from_this());

EnvList::Inst()->GetContinuousProfiler()->RegisterHook(cont_profiler_cb,
weak_from_this());

ready_ = true;

if (hooks_init_ == false) {
Expand Down Expand Up @@ -1301,7 +1266,6 @@ void GrpcAgent::do_stop() {
env_msg_.close();
shutdown_.close();
start_profiling_msg_.close();
cont_profile_queue_.reset();
}

void GrpcAgent::got_spans(const UniqRecordables& recordables) {
Expand Down Expand Up @@ -1379,11 +1343,9 @@ void GrpcAgent::got_proc_metrics() {
void GrpcAgent::got_profile(const ProfileCollector::ProfileQStor& stor) {
google::protobuf::Struct metadata;
uint64_t thread_id;
uint64_t start_timestamp;
std::visit([&metadata, &thread_id, &start_timestamp](auto& opt) {
std::visit([&metadata, &thread_id](auto& opt) {
thread_id = opt.thread_id;
metadata = opt.metadata_pb;
start_timestamp = opt.start_timestamp;
}, stor.options);

nsuv::ns_mutex::scoped_lock lock(profile_state_lock_);
Expand Down Expand Up @@ -1430,15 +1392,14 @@ void GrpcAgent::got_profile(const ProfileCollector::ProfileQStor& stor) {
return;
}

const uint64_t duration = (uv_hrtime() - start_timestamp) / 1e6;
grpcagent::Asset asset;
PopulateCommon(asset.mutable_common(),
ProfileTypeStr[stor.type],
prof_stor.req_id.c_str());
asset.set_thread_id(thread_id);
asset.mutable_metadata()->CopyFrom(metadata);
asset.set_complete(true);
asset.set_duration(duration);
asset.set_duration(uv_now(&loop_) - prof_stor.timestamp);
prof_stor.stream->Write(std::move(asset));
prof_stor.stream->WritesDone();
} else {
Expand Down Expand Up @@ -1489,125 +1450,6 @@ void GrpcAgent::got_profile(const ProfileCollector::ProfileQStor& stor) {
}
}

void GrpcAgent::got_continuous_profile(
const ProfileCollector::ProfileQStor& stor) {
static double performance_process_start_timestamp =
performance::performance_process_start_timestamp / 1e3;
google::protobuf::Struct metadata;
uint64_t thread_id;
uint64_t start_timestamp;
std::visit([&metadata, &thread_id, &start_timestamp](auto& opt) {
thread_id = opt.thread_id;
metadata = opt.metadata_pb;
start_timestamp = opt.start_timestamp;
}, stor.options);

Debug("got_continuous_profile. len: %ld, status: %d. thread_id: %ld\n",
stor.profile.length(),
stor.status,
thread_id);

if (stor.status < 0) {
// Log error but don't send error message back for continuous profiles
auto error = translate_error(stor.status);
Debug("Continuous profile error: %d\n", static_cast<int>(error));
return;
}

// Look if entry in cont_profile_stor_map_. If not create one
auto it = cont_profile_stor_map_.find(thread_id);
if (it == cont_profile_stor_map_.end()) {
// Create a new stream for the continuous profile
AssetStream* stream = new AssetStream(nsolid_service_stub_.get(),
AssetStor{stor.type, thread_id},
weak_from_this(),
agent_id_,
saas(),
EXPORT_CONTINUOUS_PROFILE);
it = cont_profile_stor_map_.emplace(thread_id, ProfileStor{
utils::generate_unique_id(),
stream,
stor.options
}).first;
}

const ProfileStor& profile_stor = it->second;
const std::string& req_id = profile_stor.req_id;
AssetStream* stream = profile_stor.stream;

// Check if the profile is complete
bool profileStreamComplete = stor.profile.length() == 0;
if (profileStreamComplete) {
uint64_t now = uv_hrtime() - performance::performance_process_start;
uint64_t start = start_timestamp - performance::performance_process_start;
double start_ts =
performance_process_start_timestamp + start / 1e6;
double end_ts = performance_process_start_timestamp + now / 1e6;
uint64_t duration = (now - start) / 1e6;
// Create complete profile
grpcagent::Asset asset;
PopulateCommon(asset.mutable_common(),
ProfileTypeStr[stor.type],
req_id.c_str());
asset.set_thread_id(thread_id);
asset.mutable_metadata()->CopyFrom(metadata);
asset.set_complete(true);
asset.set_duration(duration);
asset.set_start_ts(start_ts);
asset.set_end_ts(end_ts);

// Remove the entry from the map
cont_profile_stor_map_.erase(it);

// Send the complete profile
stream->Write(std::move(asset));
stream->WritesDone();
return;
}

// Send profile chunks
grpcagent::Asset asset;
PopulateCommon(asset.mutable_common(),
ProfileTypeStr[stor.type],
req_id.c_str());
asset.set_thread_id(thread_id);
asset.mutable_metadata()->CopyFrom(metadata);
asset.set_data(stor.profile);

size_t asset_size = asset.ByteSizeLong();
if (asset_size > GRPC_MAX_SIZE) {
// Split the data into chunks
Debug("Continuous profile size larger than supported (%ld > %ld): "
"splitting profile into chunks\n", asset_size, GRPC_MAX_SIZE);
size_t prof_size = stor.profile.size();
size_t rest = asset_size - prof_size;
size_t offset = 0;
size_t chunk_size = GRPC_MAX_SIZE - rest - 100;

while (offset < prof_size) {
grpcagent::Asset chunk_asset;
PopulateCommon(chunk_asset.mutable_common(),
ProfileTypeStr[stor.type],
req_id.c_str());
chunk_asset.set_thread_id(thread_id);
chunk_asset.mutable_metadata()->CopyFrom(metadata);

if (offset + chunk_size > prof_size) {
chunk_size = prof_size - offset;
}

chunk_asset.set_data(stor.profile.substr(offset, chunk_size));

Debug("Sending continuous profile chunk of size: %ld\n",
chunk_asset.ByteSizeLong());
stream->Write(std::move(chunk_asset));
offset += chunk_size;
}
} else {
stream->Write(std::move(asset));
}
}

void GrpcAgent::handle_command_request(CommandRequestStor&& req) {
const grpcagent::CommandRequest& request = req.request;
Debug("Command Received: %s\n", request.DebugString().c_str());
Expand Down Expand Up @@ -1720,10 +1562,6 @@ void GrpcAgent::reconfigure(const grpcagent::CommandRequest& request) {
out["tracingModulesBlacklist"] = body.tracingmodulesblacklist();
}

if (body.has_contcpuprofile()) {
out["contCpuProfile"] = body.contcpuprofile();
}

DebugJSON("Reconfigure out: \n%s\n", out);

UpdateConfig(out.dump());
Expand Down Expand Up @@ -2065,16 +1903,6 @@ ErrorType GrpcAgent::do_start_prof_init(
uint64_t thread_id = args.thread_id();
uint64_t duration = args.duration();
StartProfiling start_profiling = nullptr;

// Check if continuous profiling is enabled for this profile type
if (type == ProfileType::kCpu) {
auto it = config_.find("contCpuProfile");
if (it != config_.end() && it->get<bool>() == true) {
// Continuous CPU profiling is enabled, don't allow manual CPU profiles
return ErrorType::EInProgressError;
}
}

switch (type) {
case ProfileType::kCpu:
start_profiling = &GrpcAgent::do_start_cpu_prof;
Expand All @@ -2097,12 +1925,12 @@ ErrorType GrpcAgent::do_start_prof_init(
opt.thread_id = thread_id;
opt.duration = duration;
opt.metadata_pb = std::move(args.metadata());
opt.start_timestamp = uv_hrtime();
}, options);

nsuv::ns_mutex::scoped_lock lock(profile_state_lock_);
ProfileState& profile_state = profile_state_[type];
ProfileStor stor{ req.requestid(),
uv_now(&loop_),
nullptr,
Comment on lines 1932 to 1934
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Timestamp captured from a non-loop thread can be stale

uv_now(&loop_) is evaluated while pushing the ProfileStor from any thread (JS or agent).
uv_now() is only updated by uv_run() or an explicit uv_update_time(&loop_).
If this call happens outside the agent’s event-loop thread the stored timestamp might lag behind, producing negative or wildly large durations later.

Suggestion:

-                    uv_now(&loop_),
+                    /* timestamp populated in agent thread to guarantee fresh clock */
+                    0,  // placeholder – to be filled in do_start_prof_end()

and immediately before assigning prof_stor.stream in do_start_prof_end():

prof_stor.timestamp = uv_now(&loop_);

This guarantees the call runs on the agent’s loop thread after a recent uv_update_time().


🏁 Script executed:

#!/bin/bash
# Locate the definition of do_start_prof_end and inspect ProfileStor usage context
rg -n "do_start_prof_end" -C3 agents/grpc/src/grpc_agent.cc
rg -n "ProfileStor" -C3 agents/grpc/src/grpc_agent.cc
sed -n '1900,1960p' agents/grpc/src/grpc_agent.cc

Length of output: 5898


🏁 Script executed:

#!/bin/bash
# Locate definition of StartProfStor to understand fields and usage
rg -n "StartProfStor" -C3 agents/grpc/src/grpc_agent.cc

Length of output: 259


🏁 Script executed:

#!/bin/bash
# Search for usage of timestamp in grpc_agent.cc to see where it's used
rg -n "timestamp" -C3 agents/grpc/src/grpc_agent.cc

Length of output: 358


Ensure timestamps are captured on the agent’s loop thread to avoid stale values

Capturing uv_now(&loop_) in do_start_prof_init may run off the loop thread—uv_now() is only refreshed by uv_run() or an explicit uv_update_time(). This can lead to negative or unexpectedly large profile durations. Defer timestamping to the loop thread in do_start_prof_end().

Locations to update:

  • do_start_prof_init initializer (agents/grpc/src/grpc_agent.cc:1932–1934)
  • Duration computation in do_start_prof_end (agents/grpc/src/grpc_agent.cc:1402)

Suggested diff:

--- a/agents/grpc/src/grpc_agent.cc
+++ b/agents/grpc/src/grpc_agent.cc
@@ -1932,7 +1932,8 @@ ErrorType GrpcAgent::do_start_prof_init(const grpcagent::CommandRequest& req,
   nsuv::ns_mutex::scoped_lock lock(profile_state_lock_);
   ProfileState& profile_state = profile_state_[type];
-  ProfileStor stor{ req.requestid(),
-                    uv_now(&loop_),
+  // timestamp will be set later on the loop thread to guarantee freshness
+  ProfileStor stor{ req.requestid(),
                     0,  
                     nullptr,
                     options };
@@ -1400,7 +1400,10 @@ ErrorType GrpcAgent::do_start_prof_end(ErrorType err,
   asset.set_thread_id(thread_id);
   asset.mutable_metadata()->CopyFrom(metadata);
   asset.set_complete(true);
-  asset.set_duration(uv_now(&loop_) - prof_stor.timestamp);
+  // capture a fresh timestamp now that we’re on the loop thread
+  prof_stor.timestamp = uv_now(&loop_);
+  asset.set_duration(uv_now(&loop_) - prof_stor.timestamp);
   prof_stor.stream->Write(std::move(asset));
   prof_stor.stream->WritesDone();

This change ensures uv_now() is invoked immediately after an uv_update_time() in the loop, preventing stale timestamps.

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In agents/grpc/src/grpc_agent.cc around lines 1932 to 1934, the timestamp is
captured using uv_now(&loop_) potentially off the agent's event-loop thread,
which can cause stale or incorrect timing data. To fix this, remove the
uv_now(&loop_) call from the ProfileStor initialization in do_start_prof_init
and instead assign the timestamp inside do_start_prof_end on the loop thread
immediately after uv_update_time(&loop_) is called. This ensures the timestamp
is fresh and accurate, preventing negative or large duration calculations.

options };
auto iter = profile_state.pending_profiles_map.emplace(thread_id,
Expand Down Expand Up @@ -2134,8 +1962,7 @@ ErrorType GrpcAgent::do_start_prof_end(ErrorType err,
AssetStor{type, thread_id},
weak_from_this(),
agent_id_,
saas(),
EXPORT_ASSET);
saas());
if (err != ErrorType::ESuccess) {
send_asset_error(type, req_id, std::move(opts), stream, err);
return err;
Expand Down
Loading
Loading