diff --git a/python/ccf/tx_id.py b/python/ccf/tx_id.py index a69a33b62795..73176d1fa190 100644 --- a/python/ccf/tx_id.py +++ b/python/ccf/tx_id.py @@ -5,7 +5,7 @@ from typing import Optional -@dataclass +@dataclass(order=True) class TxID: view: int seqno: int diff --git a/samples/apps/logging/logging.cpp b/samples/apps/logging/logging.cpp index 6b5f8109bb49..9d28fcb95951 100644 --- a/samples/apps/logging/logging.cpp +++ b/samples/apps/logging/logging.cpp @@ -139,6 +139,7 @@ namespace loggingapp { private: std::map records; + std::mutex txid_lock; ccf::TxID current_txid = {}; public: @@ -147,6 +148,7 @@ namespace loggingapp void handle_committed_transaction( const ccf::TxID& tx_id, const kv::ReadOnlyStorePtr& store) { + std::lock_guard lock(txid_lock); auto tx_diff = store->create_tx_diff(); auto m = tx_diff.template diff(PRIVATE_RECORDS); m->foreach([this](const size_t& k, std::optional v) -> bool { @@ -167,6 +169,7 @@ namespace loggingapp std::optional next_requested() { + std::lock_guard lock(txid_lock); return current_txid.seqno + 1; } @@ -179,6 +182,12 @@ namespace loggingapp } return search->second; } + + ccf::TxID get_current_txid() + { + std::lock_guard lock(txid_lock); + return current_txid; + } }; // SNIPPET: inherit_frontend @@ -489,7 +498,7 @@ namespace loggingapp .set_auto_schema() .install(); - auto get_committed = [this](auto& ctx, nlohmann::json&&) { + auto get_committed = [this](auto& ctx) { // Parse id from query const auto parsed_query = http::parse_query(ctx.rpc_ctx->get_request_query()); @@ -498,29 +507,43 @@ namespace loggingapp size_t id; if (!http::get_query_value(parsed_query, "id", id, error_reason)) { - return ccf::make_error( - HTTP_STATUS_BAD_REQUEST, - ccf::errors::InvalidQueryParameterValue, - std::move(error_reason)); + auto response = nlohmann::json{{ + "error", + { + {"code", ccf::errors::InvalidQueryParameterValue}, + {"message", std::move(error_reason)}, + }, + }}; + + ctx.rpc_ctx->set_response(response, HTTP_STATUS_BAD_REQUEST); + return; } auto record = committed_records->get(id); if (record.has_value()) { - return ccf::make_success(LoggingGet::Out{record.value()}); + nlohmann::json response = LoggingGet::Out{record.value()}; + ctx.rpc_ctx->set_response(response, HTTP_STATUS_OK); + return; } - return ccf::make_error( - HTTP_STATUS_BAD_REQUEST, - ccf::errors::ResourceNotFound, - fmt::format("No such record: {}.", id)); + auto response = nlohmann::json{{ + "error", + { + {"code", ccf::errors::ResourceNotFound}, + {"message", fmt::format("No such record: {}.", id)}, + {"current_txid", committed_records->get_current_txid().to_str()}, + }, + }}; + + ctx.rpc_ctx->set_response(response, HTTP_STATUS_BAD_REQUEST); }; make_read_only_endpoint( "/log/private/committed", HTTP_GET, - ccf::json_read_only_adapter(get_committed), + get_committed, ccf::no_auth_required) .set_auto_schema() .add_query_parameter("id") diff --git a/tests/e2e_logging.py b/tests/e2e_logging.py index ac515401a417..f20c9ea226c9 100644 --- a/tests/e2e_logging.py +++ b/tests/e2e_logging.py @@ -1620,7 +1620,7 @@ def test_post_local_commit_failure(network, args): "Check that the committed index gets populated with creates and deletes" ) @reqs.supports_methods("/app/log/private/committed", "/app/log/private") -def test_committed_index(network, args): +def test_committed_index(network, args, timeout=5): remote_node, _ = network.find_primary() with remote_node.client() as c: res = c.post("/app/log/private/install_committed_index") @@ -1630,7 +1630,23 @@ def test_committed_index(network, args): _, log_id = network.txs.get_log_id(txid) - r = network.txs.request(log_id, priv=True, url_suffix="committed") + start_time = time.time() + end_time = start_time + timeout + while time.time() < end_time: + + r = network.txs.request(log_id, priv=True, url_suffix="committed") + if r.status_code == http.HTTPStatus.OK.value: + break + + current_tx_id = TxID.from_str(r.body.json()["error"]["current_txid"]) + + LOG.info(f"Current Tx ID ({current_tx_id}) - Tx ID ({txid})") + if current_tx_id >= txid: + break + + LOG.warning("Current Tx ID is behind, retrying...") + time.sleep(1) + assert r.status_code == http.HTTPStatus.OK.value, r.status_code assert r.body.json() == {"msg": f"Private message at idx {log_id} [0]"} @@ -1638,15 +1654,13 @@ def test_committed_index(network, args): r = network.txs.request(log_id, priv=True) assert r.status_code == http.HTTPStatus.BAD_REQUEST.value, r.status_code - assert r.body.json() == { - "error": {"code": "ResourceNotFound", "message": f"No such record: {log_id}."} - } + assert r.body.json()["error"]["message"] == f"No such record: {log_id}." + assert r.body.json()["error"]["code"] == "ResourceNotFound" r = network.txs.request(log_id, priv=True, url_suffix="committed") assert r.status_code == http.HTTPStatus.BAD_REQUEST.value, r.status_code - assert r.body.json() == { - "error": {"code": "ResourceNotFound", "message": f"No such record: {log_id}."} - } + assert r.body.json()["error"]["message"] == f"No such record: {log_id}." + assert r.body.json()["error"]["code"] == "ResourceNotFound" def run_udp_tests(args):