Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions nexla_sdk/resources/base_resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,18 +324,46 @@ def copy(
response = self._make_request("POST", path, json=serialized_options)
return self._parse_response(response)

def get_audit_log(self, resource_id: int) -> List[Dict[str, Any]]:
def get_audit_log(
self,
resource_id: int,
from_date: Optional[str] = None,
to_date: Optional[str] = None,
event_filter: Optional[str] = None,
change_filter: Optional[str] = None,
page: Optional[int] = None,
per_page: Optional[int] = None,
) -> List[Dict[str, Any]]:
"""
Get audit log for resource.

Args:
resource_id: Resource ID
from_date: Start date filter (YYYY-MM-DD)
to_date: End date filter (YYYY-MM-DD)
event_filter: Filter by event type
change_filter: Filter by change type
page: Page number for pagination
per_page: Items per page

Returns:
List of audit log entries
"""
path = f"{self._path}/{resource_id}/audit_log"
return self._make_request("GET", path)
params = {}
if from_date is not None:
params["from"] = from_date
if to_date is not None:
params["to"] = to_date
if event_filter is not None:
params["event_filter"] = event_filter
if change_filter is not None:
params["change_filter"] = change_filter
if page is not None:
params["page"] = page
if per_page is not None:
params["per_page"] = per_page
return self._make_request("GET", path, params=params)

def get_accessors(self, resource_id: int) -> AccessorResponseList:
"""
Expand Down
119 changes: 119 additions & 0 deletions nexla_sdk/resources/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,125 @@ def docs_recommendation(
except Exception:
return response

def get_flow_logs(
self,
flow_id: int,
from_date: str = None,
to_date: str = None,
severity: str = None,
run_id: int = None,
page: int = None,
per_page: int = None,
) -> Dict[str, Any]:
"""
Get execution logs for a specific flow.

Args:
flow_id: Flow ID
from_date: Start date filter (YYYY-MM-DD)
to_date: End date filter (YYYY-MM-DD)
severity: Filter by log severity
run_id: Filter by specific run ID
page: Page number for pagination
per_page: Items per page

Returns:
Flow execution logs
"""
path = f"{self._path}/{flow_id}/flow/logs"
params = {}
if from_date is not None:
params["from"] = from_date
if to_date is not None:
params["to"] = to_date
if severity is not None:
params["severity"] = severity
if run_id is not None:
params["run_id"] = run_id
if page is not None:
params["page"] = page
if per_page is not None:
params["per_page"] = per_page
return self._make_request("GET", path, params=params)

def search_flow_logs(
self,
flow_id: int,
run_ids: str = None,
severity: str = None,
search_string: str = None,
from_date: str = None,
to_date: str = None,
) -> Dict[str, Any]:
"""
Advanced search for flow execution logs.

Args:
flow_id: Flow ID
run_ids: Comma-separated list of run IDs to filter
severity: Filter by log severity
search_string: Free-text search string
from_date: Start date filter (YYYY-MM-DD)
to_date: End date filter (YYYY-MM-DD)

Returns:
Matching flow logs
"""
path = f"{self._path}/{flow_id}/logs_v2"
params = {}
if run_ids is not None:
params["run_ids"] = run_ids
if severity is not None:
params["severity"] = severity
if search_string is not None:
params["search_string"] = search_string
if from_date is not None:
params["from"] = from_date
if to_date is not None:
params["to"] = to_date
return self._make_request("GET", path, params=params)

def get_active_flows_metrics(
self, from_date: str = None, to_date: str = None, org_id: int = None
) -> Dict[str, Any]:
"""
Get metrics for currently active flows.

Args:
from_date: Start date filter (YYYY-MM-DD)
to_date: End date filter (YYYY-MM-DD)
org_id: Organization ID filter

Returns:
Active flows metrics
"""
path = f"{self._path}/active_flows_metrics"
params = {}
if from_date is not None:
params["from"] = from_date
if to_date is not None:
params["to"] = to_date
if org_id is not None:
params["org_id"] = org_id
return self._make_request("GET", path, params=params)

def get_run_status(
self, resource_type: str, resource_id: int, run_id: int
) -> Dict[str, Any]:
"""
Get status of a specific flow run.

Args:
resource_type: Type of resource (e.g., data_source, data_set, data_sink)
resource_id: Resource ID
run_id: Run ID

Returns:
Run status information
"""
path = f"/{resource_type}s/{resource_id}/run_status/{run_id}"
return self._make_request("GET", path)

def get_logs(
self,
resource_type: str,
Expand Down
36 changes: 36 additions & 0 deletions nexla_sdk/resources/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,42 @@ def get_rate_limits(self) -> Dict[str, Any]:
path = "/limits"
return self._make_request("GET", path)

def get_resource_flow_metrics(
self,
resource_type: str,
resource_id: int,
metric_type: str = None,
) -> Dict[str, Any]:
"""
Get flow metrics for a specific resource.

Args:
resource_type: Type of resource (e.g., data_source, data_set, data_sink)
resource_id: Resource ID
metric_type: Specific metric type to retrieve (optional)

Returns:
Flow metrics for the resource
"""
if metric_type:
path = f"/{resource_type}s/{resource_id}/flow/{metric_type}"
else:
path = f"/{resource_type}s/{resource_id}/flow"
return self._make_request("GET", path)

def get_flow_metrics_summary(self, period: str) -> Dict[str, Any]:
"""
Get flow metrics summary for a given period.

Args:
period: Time period for the summary (e.g., 'daily', 'total')

Returns:
Flow metrics summary
"""
path = f"/data_flows/metrics/{period}"
return self._make_request("GET", path)

# Convenience wrappers for flow-level logs/metrics
def get_flow_metrics(
self,
Expand Down
60 changes: 57 additions & 3 deletions nexla_sdk/resources/organizations.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,30 +212,84 @@ def get_current_account_summary(self) -> AccountSummary:
return AccountSummary.model_validate(response)

def get_org_flow_account_metrics(
self, org_id: int, from_date: str, to_date: str = None
self, org_id: int, from_date: str, to_date: str = None, aggregate: str = None
) -> Dict[str, Any]:
"""Get total account metrics for an organization (flows)."""
path = f"{self._path}/{org_id}/flows/account_metrics"
params = {"from": from_date}
if to_date:
params["to"] = to_date
if aggregate:
params["aggregate"] = aggregate
return self._make_request("GET", path, params=params)

def get_audit_log(self, org_id: int, **params) -> List[LogEntry]:
def get_audit_log(
self,
org_id: int,
from_date: str = None,
to_date: str = None,
event_filter: str = None,
change_filter: str = None,
page: int = None,
per_page: int = None,
) -> List[LogEntry]:
"""
Get audit log for an organization.

Args:
org_id: Organization ID
**params: Additional query parameters (e.g., page, per_page)
from_date: Start date filter (YYYY-MM-DD)
to_date: End date filter (YYYY-MM-DD)
event_filter: Filter by event type
change_filter: Filter by change type
page: Page number for pagination
per_page: Items per page

Returns:
List of audit log entries
"""
path = f"{self._path}/{org_id}/audit_log"
params = {}
if from_date is not None:
params["from"] = from_date
if to_date is not None:
params["to"] = to_date
if event_filter is not None:
params["event_filter"] = event_filter
if change_filter is not None:
params["change_filter"] = change_filter
if page is not None:
params["page"] = page
if per_page is not None:
params["per_page"] = per_page
response = self._make_request("GET", path, params=params)
return [LogEntry.model_validate(item) for item in response]

def get_flow_status_metrics(
self, org_id: int, from_date: str = None, page: int = None, per_page: int = None
) -> Dict[str, Any]:
"""
Get flow status metrics for an organization.

Args:
org_id: Organization ID
from_date: Start date filter (YYYY-MM-DD)
page: Page number for pagination
per_page: Items per page

Returns:
Flow status metrics
"""
path = f"{self._path}/{org_id}/flows/status_metrics"
params = {}
if from_date is not None:
params["from"] = from_date
if page is not None:
params["page"] = page
if per_page is not None:
params["per_page"] = per_page
return self._make_request("GET", path, params=params)

def get_resource_audit_log(
self, org_id: int, resource_type: str, **params
) -> List[LogEntry]:
Expand Down
Loading
Loading