diff --git a/be/src/exec/broker_writer.cpp b/be/src/exec/broker_writer.cpp index 069995871ffa56..2d30cc9f9b6a3a 100644 --- a/be/src/exec/broker_writer.cpp +++ b/be/src/exec/broker_writer.cpp @@ -139,8 +139,8 @@ Status BrokerWriter::write(const uint8_t* buf, size_t buf_len, size_t* written_l TBrokerOperationStatus response; try { Status status; - // 500ms is enough - BrokerServiceConnection client(client_cache(_state), broker_addr, 500, &status); + // we make timeout to be 5s, to avoid error in Network jitter scenarios. + BrokerServiceConnection client(client_cache(_state), broker_addr, 5000, &status); if (!status.ok()) { LOG(WARNING) << "Create broker write client failed. " << "broker=" << broker_addr @@ -148,17 +148,22 @@ Status BrokerWriter::write(const uint8_t* buf, size_t buf_len, size_t* written_l return status; } + // we do not re-try simplely, because broker server may already write data try { client->pwrite(response, request); } catch (apache::thrift::transport::TTransportException& e) { RETURN_IF_ERROR(client.reopen()); - client->pwrite(response, request); + + std::stringstream ss; + ss << "Fail to write to broker, broker:" << broker_addr << " failed:" << e.what(); + LOG(WARNING) << ss.str(); + return Status(TStatusCode::THRIFT_RPC_ERROR, ss.str()); } } catch (apache::thrift::TException& e) { std::stringstream ss; ss << "Fail to write to broker, broker:" << broker_addr << " failed:" << e.what(); LOG(WARNING) << ss.str(); - return Status(TStatusCode::THRIFT_RPC_ERROR, ss.str(), false); + return Status(TStatusCode::THRIFT_RPC_ERROR, ss.str()); } VLOG_ROW << "debug: send broker pwrite response: "