Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/ccf/tx_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Optional


@dataclass
@dataclass(order=True)
class TxID:
view: int
seqno: int
Expand Down
45 changes: 34 additions & 11 deletions samples/apps/logging/logging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ namespace loggingapp
{
private:
std::map<size_t, std::string> records;
std::mutex txid_lock;
ccf::TxID current_txid = {};

public:
Expand All @@ -147,6 +148,7 @@ namespace loggingapp
void handle_committed_transaction(
const ccf::TxID& tx_id, const kv::ReadOnlyStorePtr& store)
{
std::lock_guard<std::mutex> lock(txid_lock);
auto tx_diff = store->create_tx_diff();
auto m = tx_diff.template diff<RecordsMap>(PRIVATE_RECORDS);
m->foreach([this](const size_t& k, std::optional<std::string> v) -> bool {
Expand All @@ -167,6 +169,7 @@ namespace loggingapp

std::optional<ccf::SeqNo> next_requested()
{
std::lock_guard<std::mutex> lock(txid_lock);
return current_txid.seqno + 1;
}

Expand All @@ -179,6 +182,12 @@ namespace loggingapp
}
return search->second;
}

ccf::TxID get_current_txid()
{
std::lock_guard<std::mutex> lock(txid_lock);
return current_txid;
}
};

// SNIPPET: inherit_frontend
Expand Down Expand Up @@ -489,7 +498,7 @@ namespace loggingapp
.set_auto_schema<void, void>()
.install();

auto get_committed = [this](auto& ctx, nlohmann::json&&) {
auto get_committed = [this](auto& ctx) {
// Parse id from query
const auto parsed_query =
http::parse_query(ctx.rpc_ctx->get_request_query());
Expand All @@ -498,29 +507,43 @@ namespace loggingapp
size_t id;
if (!http::get_query_value(parsed_query, "id", id, error_reason))
{
return ccf::make_error(
HTTP_STATUS_BAD_REQUEST,
ccf::errors::InvalidQueryParameterValue,
std::move(error_reason));
auto response = nlohmann::json{{
"error",
{
{"code", ccf::errors::InvalidQueryParameterValue},
{"message", std::move(error_reason)},
},
}};

ctx.rpc_ctx->set_response(response, HTTP_STATUS_BAD_REQUEST);
return;
}

auto record = committed_records->get(id);

if (record.has_value())
{
return ccf::make_success(LoggingGet::Out{record.value()});
nlohmann::json response = LoggingGet::Out{record.value()};
ctx.rpc_ctx->set_response(response, HTTP_STATUS_OK);
return;
}

return ccf::make_error(
HTTP_STATUS_BAD_REQUEST,
ccf::errors::ResourceNotFound,
fmt::format("No such record: {}.", id));
auto response = nlohmann::json{{
"error",
{
{"code", ccf::errors::ResourceNotFound},
{"message", fmt::format("No such record: {}.", id)},
{"current_txid", committed_records->get_current_txid().to_str()},
},
}};

ctx.rpc_ctx->set_response(response, HTTP_STATUS_BAD_REQUEST);
};

make_read_only_endpoint(
"/log/private/committed",
HTTP_GET,
ccf::json_read_only_adapter(get_committed),
get_committed,
ccf::no_auth_required)
.set_auto_schema<void, LoggingGet::Out>()
.add_query_parameter<size_t>("id")
Expand Down
30 changes: 22 additions & 8 deletions tests/e2e_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -1620,7 +1620,7 @@ def test_post_local_commit_failure(network, args):
"Check that the committed index gets populated with creates and deletes"
)
@reqs.supports_methods("/app/log/private/committed", "/app/log/private")
def test_committed_index(network, args):
def test_committed_index(network, args, timeout=5):
remote_node, _ = network.find_primary()
with remote_node.client() as c:
res = c.post("/app/log/private/install_committed_index")
Expand All @@ -1630,23 +1630,37 @@ def test_committed_index(network, args):

_, log_id = network.txs.get_log_id(txid)

r = network.txs.request(log_id, priv=True, url_suffix="committed")
start_time = time.time()
end_time = start_time + timeout
while time.time() < end_time:

r = network.txs.request(log_id, priv=True, url_suffix="committed")
if r.status_code == http.HTTPStatus.OK.value:
break

current_tx_id = TxID.from_str(r.body.json()["error"]["current_txid"])

LOG.info(f"Current Tx ID ({current_tx_id}) - Tx ID ({txid})")
if current_tx_id >= txid:
break

LOG.warning("Current Tx ID is behind, retrying...")
time.sleep(1)

assert r.status_code == http.HTTPStatus.OK.value, r.status_code
assert r.body.json() == {"msg": f"Private message at idx {log_id} [0]"}

network.txs.delete(log_id, priv=True)

r = network.txs.request(log_id, priv=True)
assert r.status_code == http.HTTPStatus.BAD_REQUEST.value, r.status_code
assert r.body.json() == {
"error": {"code": "ResourceNotFound", "message": f"No such record: {log_id}."}
}
assert r.body.json()["error"]["message"] == f"No such record: {log_id}."
assert r.body.json()["error"]["code"] == "ResourceNotFound"

r = network.txs.request(log_id, priv=True, url_suffix="committed")
assert r.status_code == http.HTTPStatus.BAD_REQUEST.value, r.status_code
assert r.body.json() == {
"error": {"code": "ResourceNotFound", "message": f"No such record: {log_id}."}
}
assert r.body.json()["error"]["message"] == f"No such record: {log_id}."
assert r.body.json()["error"]["code"] == "ResourceNotFound"


def run_udp_tests(args):
Expand Down