Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 20 additions & 22 deletions tools/rpc_replay/rpc_replay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,10 @@ static void* replay_thread(void* arg) {
double req_rate = FLAGS_qps / (double)FLAGS_thread_num;
brpc::SerializedRequest req;
brpc::NsheadMessage nshead_req;
std::deque<int64_t> timeq;
size_t MAX_QUEUE_SIZE = (size_t)req_rate;
if (MAX_QUEUE_SIZE < 100) {
MAX_QUEUE_SIZE = 100;
} else if (MAX_QUEUE_SIZE > 2000) {
MAX_QUEUE_SIZE = 2000;
}
timeq.push_back(butil::gettimeofday_us());
int64_t last_expected_time = butil::monotonic_time_ns();
const int64_t interval = (int64_t) (1000000000L / req_rate);
// the max tolerant delay between end_time and expected_time. 10ms or 10 intervals
int64_t max_tolerant_delay = std::max((int64_t) 10000000L, 10 * interval);
for (int i = 0; !brpc::IsAskedToQuit() && i < FLAGS_times; ++i) {
brpc::SampleIterator it(FLAGS_dir);
int j = 0;
Expand Down Expand Up @@ -199,21 +195,15 @@ static void* replay_thread(void* arg) {
brpc::NewCallback(handle_response, cntl, start_time, false);
chan->CallMethod(NULL/*use rpc_dump_context in cntl instead*/,
cntl, req_ptr, NULL/*ignore response*/, done);
const int64_t end_time = butil::gettimeofday_us();
int64_t expected_elp = 0;
int64_t actual_elp = 0;
timeq.push_back(end_time);
if (timeq.size() > MAX_QUEUE_SIZE) {
actual_elp = end_time - timeq.front();
timeq.pop_front();
expected_elp = (size_t)(1000000 * timeq.size() / req_rate);
} else {
actual_elp = end_time - timeq.front();
expected_elp = (size_t)(1000000 * (timeq.size() - 1) / req_rate);
}
if (actual_elp < expected_elp) {
bthread_usleep(expected_elp - actual_elp);
int64_t end_time = butil::monotonic_time_ns();
int64_t expected_time = last_expected_time + interval;
if (end_time < expected_time) {
usleep((expected_time - end_time)/1000);
}
if (end_time - expected_time > max_tolerant_delay) {
expected_time = end_time;
}
last_expected_time = expected_time;
}
}
}
Expand Down Expand Up @@ -254,6 +244,14 @@ int main(int argc, char* argv[]) {
}
}

const int rate_limit_per_thread = 1000000;
int req_rate_per_thread = FLAGS_qps / FLAGS_thread_num;
if (req_rate_per_thread > rate_limit_per_thread) {
LOG(ERROR) << "req_rate: " << (int64_t) req_rate_per_thread << " is too large in one thread. The rate limit is "
<< rate_limit_per_thread << " in one thread";
return false;
}

std::vector<bthread_t> bids;
std::vector<pthread_t> pids;
if (!FLAGS_use_bthread) {
Expand Down