diff --git a/tools/rpc_replay/rpc_replay.cpp b/tools/rpc_replay/rpc_replay.cpp index d022e3cbdd..a08fbefa1c 100644 --- a/tools/rpc_replay/rpc_replay.cpp +++ b/tools/rpc_replay/rpc_replay.cpp @@ -135,14 +135,10 @@ static void* replay_thread(void* arg) { double req_rate = FLAGS_qps / (double)FLAGS_thread_num; brpc::SerializedRequest req; brpc::NsheadMessage nshead_req; - std::deque timeq; - size_t MAX_QUEUE_SIZE = (size_t)req_rate; - if (MAX_QUEUE_SIZE < 100) { - MAX_QUEUE_SIZE = 100; - } else if (MAX_QUEUE_SIZE > 2000) { - MAX_QUEUE_SIZE = 2000; - } - timeq.push_back(butil::gettimeofday_us()); + int64_t last_expected_time = butil::monotonic_time_ns(); + const int64_t interval = (int64_t) (1000000000L / req_rate); + // the max tolerant delay between end_time and expected_time. 10ms or 10 intervals + int64_t max_tolerant_delay = std::max((int64_t) 10000000L, 10 * interval); for (int i = 0; !brpc::IsAskedToQuit() && i < FLAGS_times; ++i) { brpc::SampleIterator it(FLAGS_dir); int j = 0; @@ -199,21 +195,15 @@ static void* replay_thread(void* arg) { brpc::NewCallback(handle_response, cntl, start_time, false); chan->CallMethod(NULL/*use rpc_dump_context in cntl instead*/, cntl, req_ptr, NULL/*ignore response*/, done); - const int64_t end_time = butil::gettimeofday_us(); - int64_t expected_elp = 0; - int64_t actual_elp = 0; - timeq.push_back(end_time); - if (timeq.size() > MAX_QUEUE_SIZE) { - actual_elp = end_time - timeq.front(); - timeq.pop_front(); - expected_elp = (size_t)(1000000 * timeq.size() / req_rate); - } else { - actual_elp = end_time - timeq.front(); - expected_elp = (size_t)(1000000 * (timeq.size() - 1) / req_rate); - } - if (actual_elp < expected_elp) { - bthread_usleep(expected_elp - actual_elp); + int64_t end_time = butil::monotonic_time_ns(); + int64_t expected_time = last_expected_time + interval; + if (end_time < expected_time) { + usleep((expected_time - end_time)/1000); } + if (end_time - expected_time > max_tolerant_delay) { + expected_time = end_time; + } + last_expected_time = expected_time; } } } @@ -254,6 +244,14 @@ int main(int argc, char* argv[]) { } } + const int rate_limit_per_thread = 1000000; + int req_rate_per_thread = FLAGS_qps / FLAGS_thread_num; + if (req_rate_per_thread > rate_limit_per_thread) { + LOG(ERROR) << "req_rate: " << (int64_t) req_rate_per_thread << " is too large in one thread. The rate limit is " + << rate_limit_per_thread << " in one thread"; + return false; + } + std::vector bids; std::vector pids; if (!FLAGS_use_bthread) {