refactor(athena): use pyathena native driver, drop ibis dependency#2271
refactor(athena): use pyathena native driver, drop ibis dependency#2271goldmedal wants to merge 2 commits into
Conversation
The athena connector now uses pyathena directly with a Trino-style type lexer to materialise cursor results into PyArrow tables, removing the ibis-framework[athena] dependency from the athena extra. Highlights - connector/athena.py: native pyathena cursor; type strings parsed via sqlglot (varchar, decimal(p,s), array<T>, row(...), map<K,V>, etc.). - model/data_source.py::get_athena_connection: preserves the Web-Identity-Token (OIDC -> AssumeRoleWithWebIdentity) and access-key auth flows; returns a pyathena.connection.Connection. - pyproject.toml: athena extra -> pyathena[pandas]>=3. Tests - tests/unit/test_athena_connector.py mocks pyathena cursor + boto3 STS to verify the type lexer, cursor->Arrow materialisation, error mapping, and all three credential resolution paths. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
WalkthroughThis PR replaces the Ibis-based Athena connector with a native PyAthena implementation. It adds a new connector module that parses Athena/Trino types to PyArrow, builds tables from DB-API cursors, handles AWS credential resolution (OIDC and explicit keys), and provides query/dry-run/close operations with proper error handling and integration into the connector factory. ChangesNative Athena Connector
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
core/wren/src/wren/model/data_source.py (1)
250-294:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winCritical type mismatch:
get_athena_connectionreturnspyathena.Connectionbutget_connectionis annotated→ BaseBackend.The dispatcher in
get_connection(line 233) callsgetattr(self, f"get_{self.name}_connection")(info)and expects aBaseBackend. Athena callers will receive apyathena.Connectioninstead — breaking any code that uses ibis methods like.sql(),.table(), or.list_tables().Additionally, this method duplicates
_build_connect_kwargswith critical divergences already in place:
Aspect get_athena_connection_build_connect_kwargsschema_nameUnconditionally set (even if None)Only set when truthy kill_on_interruptNot set Defaulted to Trueinfo.kwargsIgnored Merged into kwargs Refactor by reusing the shared helper to avoid future divergence:
♻️ Suggested refactor
`@staticmethod` def get_athena_connection(info: AthenaConnectionInfo): """Open a pyathena DB-API connection. ... """ from pyathena import connect # noqa: PLC0415 + from wren.connector.athena import _build_connect_kwargs # noqa: PLC0415 - kwargs: dict[str, Any] = { - "s3_staging_dir": info.s3_staging_dir.get_secret_value(), - "schema_name": info.schema_name, - } - ... - return connect(**kwargs) + return connect(**_build_connect_kwargs(info))🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@core/wren/src/wren/model/data_source.py` around lines 250 - 294, get_athena_connection currently returns a raw pyathena.Connection (breaking get_connection's expectation of a BaseBackend) and reimplements logic already in _build_connect_kwargs with subtle differences; modify get_athena_connection to call the shared _build_connect_kwargs(info) to produce kwargs (so schema_name, kill_on_interrupt and info.kwargs are handled identically), then pass those kwargs into pyathena connect and wrap/return the resulting connection as the expected BaseBackend (the same backend type other branches return) so the dispatcher in get_connection and callers using .sql(), .table(), .list_tables() get a consistent ibis backend object; keep references to get_athena_connection, get_connection and _build_connect_kwargs to locate the change.
🧹 Nitpick comments (3)
core/wren/src/wren/connector/athena.py (3)
294-310: ⚡ Quick winPush
limitinto the cursor fetch instead of slicing afterfetchall.
_build_athena_arrow_tablecallscursor.fetchall(), materialising every row in memory, after whichtable.slice(0, limit)discards the surplus. For a query that returns millions of rows butlimit=100, this is a large amount of wasted memory and network transfer from Athena's result store.Consider threading
limitthrough to the table builder and usingcursor.fetchmany(limit):♻️ Suggested change
-def _build_athena_arrow_table(cursor) -> pa.Table: +def _build_athena_arrow_table(cursor, limit: int | None = None) -> pa.Table: """Materialise a pyathena DB-API cursor into a PyArrow table.""" if cursor.description is None: return pa.table({}) - rows = cursor.fetchall() + rows = cursor.fetchmany(limit) if limit is not None else cursor.fetchall()def query(self, sql: str, limit: int | None = None) -> pa.Table: try: with contextlib.closing(self.connection.cursor()) as cursor: cursor.execute(sql) - table = _build_athena_arrow_table(cursor) - if limit is not None: - table = table.slice(0, limit) + table = _build_athena_arrow_table(cursor, limit=limit) return table🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@core/wren/src/wren/connector/athena.py` around lines 294 - 310, The query method currently calls _build_athena_arrow_table which uses cursor.fetchall(), causing full materialization before applying limit; modify query to pass the limit into _build_athena_arrow_table (or create a new _build_athena_arrow_table_with_limit) and change the builder to use cursor.fetchmany(limit) (or iterate fetchmany in batches) instead of fetchall(), so that when query(sql, limit=...) is called the cursor only fetches up to the requested rows and no post-slice is needed.
329-332: 💤 Low valueConsider logging the swallowed exception on close.
Silently dropping close-time errors makes it impossible to diagnose connection-pool/socket issues. A debug-level log preserves the cleanup semantics while keeping a breadcrumb.
- try: - self.connection.close() - except Exception: - pass + try: + self.connection.close() + except Exception as e: # noqa: BLE001 + logger.debug("Failed to close Athena connection: %s", e) finally: self.connection = None🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@core/wren/src/wren/connector/athena.py` around lines 329 - 332, The close block currently swallows exceptions from self.connection.close(); replace the bare except/pass with a debug-level log that records the exception (e.g., use the class/module logger or logging.getLogger(__name__) to call logger.debug("Error closing Athena connection", exc_info=True)) so close-time errors are preserved for diagnosis while keeping cleanup behavior.
257-272: 💤 Low valueSTS web-identity credentials expire; consider refresh strategy for long-lived connectors.
assume_role_with_web_identityreturns credentials with a fixed TTL (default 1 hour). The credentials are extracted and passed as static values topyathena.connect()atAthenaConnector.__init__()(lines 270–272), so anyAthenaConnectorinstance reused beyond that window will fail on subsequent queries with an expired-credentials error.If
AthenaConnectorinstances are short-lived (per-request), this is fine. If Engine instances are cached or pooled for long-running sessions, consider either: (a) constructing a boto3RefreshableCredentialsprovider and passing it via thebotocore_sessionkwarg (if pyathena supports it), or (b) recreating the connection on credential expiry.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@core/wren/src/wren/connector/athena.py` around lines 257 - 272, The code in AthenaConnector.__init__ calls sts.assume_role_with_web_identity and injects the returned static creds into kwargs passed to pyathena.connect, which will expire; replace this static injection with a refreshable credential strategy: either create a botocore.session.Session with botocore.credentials.RefreshableCredentials (or use boto3's get_credentials refresh mechanism) that calls assume_role_with_web_identity when expired and pass that session via the botocore_session kwarg to pyathena.connect, or implement lazy/transparent reconnect logic in AthenaConnector (e.g., detect expired-credentials errors on query execution and re-run assume_role_with_web_identity to recreate the pyathena connection); locate the logic around assume_role_with_web_identity, the creds assignment to kwargs["aws_access_key_id"/"aws_secret_access_key"/"aws_session_token"], and pyathena.connect usage to apply this change.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@core/wren/tests/unit/test_athena_connector.py`:
- Around line 307-309: The test
test_data_source_get_athena_connection_returns_pyathena_connection currently
only asserts the side-effect _pyathena_connect_calls; update it to also assert
the returned value from DataSourceExtension.get_athena_connection(_info()) is
the expected pyathena connection object (for example, compare to the mocked
connection object or assert it's truthy/has expected attributes). Locate the
call to DataSourceExtension.get_athena_connection in the test and add a concise
assertion on the returned_connection variable (e.g., returned_connection is
mock_conn or returned_connection is not None / has expected type) while keeping
the existing _pyathena_connect_calls assertion.
---
Outside diff comments:
In `@core/wren/src/wren/model/data_source.py`:
- Around line 250-294: get_athena_connection currently returns a raw
pyathena.Connection (breaking get_connection's expectation of a BaseBackend) and
reimplements logic already in _build_connect_kwargs with subtle differences;
modify get_athena_connection to call the shared _build_connect_kwargs(info) to
produce kwargs (so schema_name, kill_on_interrupt and info.kwargs are handled
identically), then pass those kwargs into pyathena connect and wrap/return the
resulting connection as the expected BaseBackend (the same backend type other
branches return) so the dispatcher in get_connection and callers using .sql(),
.table(), .list_tables() get a consistent ibis backend object; keep references
to get_athena_connection, get_connection and _build_connect_kwargs to locate the
change.
---
Nitpick comments:
In `@core/wren/src/wren/connector/athena.py`:
- Around line 294-310: The query method currently calls
_build_athena_arrow_table which uses cursor.fetchall(), causing full
materialization before applying limit; modify query to pass the limit into
_build_athena_arrow_table (or create a new _build_athena_arrow_table_with_limit)
and change the builder to use cursor.fetchmany(limit) (or iterate fetchmany in
batches) instead of fetchall(), so that when query(sql, limit=...) is called the
cursor only fetches up to the requested rows and no post-slice is needed.
- Around line 329-332: The close block currently swallows exceptions from
self.connection.close(); replace the bare except/pass with a debug-level log
that records the exception (e.g., use the class/module logger or
logging.getLogger(__name__) to call logger.debug("Error closing Athena
connection", exc_info=True)) so close-time errors are preserved for diagnosis
while keeping cleanup behavior.
- Around line 257-272: The code in AthenaConnector.__init__ calls
sts.assume_role_with_web_identity and injects the returned static creds into
kwargs passed to pyathena.connect, which will expire; replace this static
injection with a refreshable credential strategy: either create a
botocore.session.Session with botocore.credentials.RefreshableCredentials (or
use boto3's get_credentials refresh mechanism) that calls
assume_role_with_web_identity when expired and pass that session via the
botocore_session kwarg to pyathena.connect, or implement lazy/transparent
reconnect logic in AthenaConnector (e.g., detect expired-credentials errors on
query execution and re-run assume_role_with_web_identity to recreate the
pyathena connection); locate the logic around assume_role_with_web_identity, the
creds assignment to
kwargs["aws_access_key_id"/"aws_secret_access_key"/"aws_session_token"], and
pyathena.connect usage to apply this change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 984d7cb3-c615-4abb-94a3-50ac7f11131c
⛔ Files ignored due to path filters (1)
core/wren/uv.lockis excluded by!**/*.lock
📒 Files selected for processing (5)
core/wren/pyproject.tomlcore/wren/src/wren/connector/athena.pycore/wren/src/wren/connector/factory.pycore/wren/src/wren/model/data_source.pycore/wren/tests/unit/test_athena_connector.py
CodeRabbit flagged that data_source.get_athena_connection re-implemented the pyathena connect-kwargs logic that also lives in connector/athena.py, letting them drift on schema_name / kill_on_interrupt / info.kwargs, and the get_connection signature claimed BaseBackend even though the Athena path returned a raw pyathena Connection. Route data_source.get_athena_connection through the connector's shared _build_connect_kwargs builder, and widen the return-type annotation to a BackendOrConnection Union so the type matches reality without forcing a runtime pyathena import. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Replies to CodeRabbit review-body findings (no inline IDs available):
|
|
Superseded by #2313 — these seven native-driver refactors were consolidated into a single feature branch to resolve shared-file conflicts (data_source.py, pyproject.toml, uv.lock, factory.py, etc.) once instead of seven times. |
Summary
The athena connector now uses
pyathenadirectly with a Trino-style type lexer for cursor results. The athena extra no longer pulls inibis-framework[athena].Highlights
connector/athena.py(new): native pyathena DB-API cursor; type strings parsed via sqlglot (varchar,decimal(p,s),array<T>,row(...),map<K,V>, etc.) and materialised into PyArrow.connector/factory.py: routesDataSource.athenato the newwren.connector.athenamodule.model/data_source.py::get_athena_connection: preserves the Web-Identity-Token (OIDC ->AssumeRoleWithWebIdentity) and explicit access-key auth flows; falls back to the default boto3 credential chain otherwise. Now returns apyathena.connection.Connection.pyproject.toml: athena extra ->pyathena[pandas]>=3.Tests
tests/unit/test_athena_connector.py(new, mocked) covers:decimal,array,map,rowand unknown/null fallbacks.EXPLAINdry-run,limitslicing, error wrapping (INVALID_SQL+ correctErrorPhase).AssumeRoleWithWebIdentity(mocked boto3), and default credential chain fallback.Test plan
just install-devjust lint-> all checks passeduv run pytest tests/unit/test_athena_connector.py -v-> 25 passedjust test-unit-> 179 passed in this branch (one unrelated pre-existing failure ontest_context_cli.py::test_validate_strict_warns, also fails on main).Summary by CodeRabbit
Release Notes