-
Notifications
You must be signed in to change notification settings - Fork 3k
Add idempotency adapter and E2E coverage #14773
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
Outdated
Show resolved
Hide resolved
| long now = System.currentTimeMillis(); | ||
| boolean expired = | ||
| existing.status == IdempotencyEntry.Status.FINALIZED | ||
| && (now - existing.firstSeenMillis) > idempotencyLifetimeMillis; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use Instant.now().minusMillis(existing.firstSeenMillis) here as that is more readable when adding isBefore / isAfter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could also make this a function on the entry itself, so that you'd only call if(!entry.expired()) here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks!
| } | ||
|
|
||
| // Test hooks/configuration for idempotency behavior | ||
| public static void simulate503OnFirstSuccessForKey(String key) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really a fan of such a way to configure this. We should explore doing this similarly how remote scan planning sets the planning behavior for particular tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. I removed the static hook from CatalogHandlers and mirrored the remote planning pattern:
- Added a per-adapter IdempotencyBehavior in RESTCatalogAdapter and apply it post‑success in execute(...).
- Tests configure behavior via the adapter (e.g., adapterForRESTServer.simulate503OnFirstSuccessForKey(key)), while routes still call CatalogHandlers.withIdempotency(...).
| AuthSession httpSession = am.initSession(httpBase, conf); | ||
| RESTClient http = httpBase.withAuthSession(httpSession); | ||
|
|
||
| CreateTableRequest req = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems a bit weird to create the request here. Can we not mock stuff as we do in a bunch of other tests and verify that the correct requests were sent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’ve added a reusable verify helper and now assert the request shape (method/path/headers, including Idempotency‑Key) after the POSTs. I’m keeping a real CreateTableRequest in the E2E tests because it’s needed to exercise serialization and replay.
| return new IdempotentCreateEnv(ns, ident, headers, http, req); | ||
| } | ||
|
|
||
| private static class IdempotentCreateEnv { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need this wrapper class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed this wrapped class
47aae84 to
a8a0a16
Compare
|
@nastra @amogh-jahagirdar @singhpk234 Could you please take a look when you have a moment? Thanks! |
| simulate503OnFirstSuccessKeys.add(key); | ||
| } | ||
|
|
||
| private static OAuthTokenResponse handleOAuthRequest(Object body) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we wrap this to plan-api endpoints too ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Plan endpoints aren’t wrapped with withIdempotency yet. I’ll follow up with a separate PR to add Idempotency-Key support for the plan APIs (wrap handlers + add tests) and we can extend the simulation coverage there.
| } | ||
|
|
||
| /** Test helper to simulate a transient 503 after the first successful mutation for a key. */ | ||
| public void simulate503OnFirstSuccessForKey(String key) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit : should we make it generic like simulate status code or something ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion. I kept it 503-specific since the test hook is only used to exercise retry behavior for transient 503/commit-state-unknown; happy to generalize later if we add tests needing other codes.
| String key = "idemp-drop-void"; | ||
| Namespace ns = Namespace.of("ns_void"); | ||
| TableIdentifier ident = TableIdentifier.of(ns, "t_void"); | ||
| restCatalog.createNamespace(ns, ImmutableMap.of()); | ||
| Pair<RESTClient, Map<String, String>> httpAndHeaders = httpAndHeaders(key); | ||
| RESTClient http = httpAndHeaders.first(); | ||
| Map<String, String> headers = httpAndHeaders.second(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wonder if all this can be abstracted in a utils which takes key, namespace and table_name ? wdyt ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comment! I have added a smaller helper method for the common code.
| String key = keyHeader.get().value(); | ||
|
|
||
| // check existing entry and TTL | ||
| IdempotencyEntry existing = IDEMPOTENCY_STORE.get(key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is rare in this use case but maybe we should use computeIfAbsent here to avoid any race conditions get and put issues
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call — I updated this to avoid the get/remove/put race by using a single atomic map operation. I now use IDEMPOTENCY_STORE.compute(...) to either reuse the current entry or replace it if missing/expired, and use a per-call isLeader flag so only the thread that created the new entry executes the action while others wait and replay the finalized result. I used compute (instead of computeIfAbsent) because we also need to replace expired entries.
a8a0a16 to
4a81530
Compare
singhpk234
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly LGTM, thanks @huaxingao left minor comments
|
|
||
| String key = keyHeader.get().value(); | ||
|
|
||
| AtomicBoolean isLeader = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whats a Leader ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
“Leader” here means the request thread that won the IDEMPOTENCY_STORE.compute(...) and created (or replaced) the IN_PROGRESS entry for this Idempotency-Key. That thread executes the action and finalizes the entry; all other concurrent requests for the same key are “followers” that wait on the latch and then replay the finalized result/error.
| .hasMessageContaining(ident.toString()); | ||
|
|
||
| // Clean up | ||
| restCatalog.dropTable(ident); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need explicit drop ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. This is not needed. Removed.
| try { | ||
| idempotencyLifetimeMillis = Duration.parse(isoDuration).toMillis(); | ||
| } catch (Exception e) { | ||
| // ignore parse errors; keep default |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry i might have missed this in first pass, this is public imho we should throw illegalArgs ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I made this test-only (@VisibleForTesting + package-private) and changed it to throw IllegalArgumentException on invalid ISO durations rather than silently ignoring parse errors.
|
|
||
| private AuthSession authSession = AuthSession.EMPTY; | ||
| private PlanningBehavior planningBehavior; | ||
| private final java.util.Set<String> simulate503OnFirstSuccessKeys = Sets.newConcurrentHashSet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we import this instead of inline ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Thanks
This is the 4th PR for Idempotency Key. Introduce test-only idempotency in
RESTCatalogAdapter:Add E2E tests:
First PR: #14649
SecondPR: #14700
Third PR: #14740