diff --git a/nexla_sdk/resources/base_resource.py b/nexla_sdk/resources/base_resource.py index a97cd48..228c097 100644 --- a/nexla_sdk/resources/base_resource.py +++ b/nexla_sdk/resources/base_resource.py @@ -324,18 +324,46 @@ def copy( response = self._make_request("POST", path, json=serialized_options) return self._parse_response(response) - def get_audit_log(self, resource_id: int) -> List[Dict[str, Any]]: + def get_audit_log( + self, + resource_id: int, + from_date: Optional[str] = None, + to_date: Optional[str] = None, + event_filter: Optional[str] = None, + change_filter: Optional[str] = None, + page: Optional[int] = None, + per_page: Optional[int] = None, + ) -> List[Dict[str, Any]]: """ Get audit log for resource. Args: resource_id: Resource ID + from_date: Start date filter (YYYY-MM-DD) + to_date: End date filter (YYYY-MM-DD) + event_filter: Filter by event type + change_filter: Filter by change type + page: Page number for pagination + per_page: Items per page Returns: List of audit log entries """ path = f"{self._path}/{resource_id}/audit_log" - return self._make_request("GET", path) + params = {} + if from_date is not None: + params["from"] = from_date + if to_date is not None: + params["to"] = to_date + if event_filter is not None: + params["event_filter"] = event_filter + if change_filter is not None: + params["change_filter"] = change_filter + if page is not None: + params["page"] = page + if per_page is not None: + params["per_page"] = per_page + return self._make_request("GET", path, params=params) def get_accessors(self, resource_id: int) -> AccessorResponseList: """ diff --git a/nexla_sdk/resources/flows.py b/nexla_sdk/resources/flows.py index 4c6291d..4576191 100644 --- a/nexla_sdk/resources/flows.py +++ b/nexla_sdk/resources/flows.py @@ -271,6 +271,125 @@ def docs_recommendation( except Exception: return response + def get_flow_logs( + self, + flow_id: int, + from_date: str = None, + to_date: str = None, + severity: str = None, + run_id: int = None, + page: int = None, + per_page: int = None, + ) -> Dict[str, Any]: + """ + Get execution logs for a specific flow. + + Args: + flow_id: Flow ID + from_date: Start date filter (YYYY-MM-DD) + to_date: End date filter (YYYY-MM-DD) + severity: Filter by log severity + run_id: Filter by specific run ID + page: Page number for pagination + per_page: Items per page + + Returns: + Flow execution logs + """ + path = f"{self._path}/{flow_id}/flow/logs" + params = {} + if from_date is not None: + params["from"] = from_date + if to_date is not None: + params["to"] = to_date + if severity is not None: + params["severity"] = severity + if run_id is not None: + params["run_id"] = run_id + if page is not None: + params["page"] = page + if per_page is not None: + params["per_page"] = per_page + return self._make_request("GET", path, params=params) + + def search_flow_logs( + self, + flow_id: int, + run_ids: str = None, + severity: str = None, + search_string: str = None, + from_date: str = None, + to_date: str = None, + ) -> Dict[str, Any]: + """ + Advanced search for flow execution logs. + + Args: + flow_id: Flow ID + run_ids: Comma-separated list of run IDs to filter + severity: Filter by log severity + search_string: Free-text search string + from_date: Start date filter (YYYY-MM-DD) + to_date: End date filter (YYYY-MM-DD) + + Returns: + Matching flow logs + """ + path = f"{self._path}/{flow_id}/logs_v2" + params = {} + if run_ids is not None: + params["run_ids"] = run_ids + if severity is not None: + params["severity"] = severity + if search_string is not None: + params["search_string"] = search_string + if from_date is not None: + params["from"] = from_date + if to_date is not None: + params["to"] = to_date + return self._make_request("GET", path, params=params) + + def get_active_flows_metrics( + self, from_date: str = None, to_date: str = None, org_id: int = None + ) -> Dict[str, Any]: + """ + Get metrics for currently active flows. + + Args: + from_date: Start date filter (YYYY-MM-DD) + to_date: End date filter (YYYY-MM-DD) + org_id: Organization ID filter + + Returns: + Active flows metrics + """ + path = f"{self._path}/active_flows_metrics" + params = {} + if from_date is not None: + params["from"] = from_date + if to_date is not None: + params["to"] = to_date + if org_id is not None: + params["org_id"] = org_id + return self._make_request("GET", path, params=params) + + def get_run_status( + self, resource_type: str, resource_id: int, run_id: int + ) -> Dict[str, Any]: + """ + Get status of a specific flow run. + + Args: + resource_type: Type of resource (e.g., data_source, data_set, data_sink) + resource_id: Resource ID + run_id: Run ID + + Returns: + Run status information + """ + path = f"/{resource_type}s/{resource_id}/run_status/{run_id}" + return self._make_request("GET", path) + def get_logs( self, resource_type: str, diff --git a/nexla_sdk/resources/metrics.py b/nexla_sdk/resources/metrics.py index 1a43b55..d3b6f3a 100644 --- a/nexla_sdk/resources/metrics.py +++ b/nexla_sdk/resources/metrics.py @@ -92,6 +92,42 @@ def get_rate_limits(self) -> Dict[str, Any]: path = "/limits" return self._make_request("GET", path) + def get_resource_flow_metrics( + self, + resource_type: str, + resource_id: int, + metric_type: str = None, + ) -> Dict[str, Any]: + """ + Get flow metrics for a specific resource. + + Args: + resource_type: Type of resource (e.g., data_source, data_set, data_sink) + resource_id: Resource ID + metric_type: Specific metric type to retrieve (optional) + + Returns: + Flow metrics for the resource + """ + if metric_type: + path = f"/{resource_type}s/{resource_id}/flow/{metric_type}" + else: + path = f"/{resource_type}s/{resource_id}/flow" + return self._make_request("GET", path) + + def get_flow_metrics_summary(self, period: str) -> Dict[str, Any]: + """ + Get flow metrics summary for a given period. + + Args: + period: Time period for the summary (e.g., 'daily', 'total') + + Returns: + Flow metrics summary + """ + path = f"/data_flows/metrics/{period}" + return self._make_request("GET", path) + # Convenience wrappers for flow-level logs/metrics def get_flow_metrics( self, diff --git a/nexla_sdk/resources/organizations.py b/nexla_sdk/resources/organizations.py index 6ea79cc..d1368e1 100644 --- a/nexla_sdk/resources/organizations.py +++ b/nexla_sdk/resources/organizations.py @@ -212,30 +212,84 @@ def get_current_account_summary(self) -> AccountSummary: return AccountSummary.model_validate(response) def get_org_flow_account_metrics( - self, org_id: int, from_date: str, to_date: str = None + self, org_id: int, from_date: str, to_date: str = None, aggregate: str = None ) -> Dict[str, Any]: """Get total account metrics for an organization (flows).""" path = f"{self._path}/{org_id}/flows/account_metrics" params = {"from": from_date} if to_date: params["to"] = to_date + if aggregate: + params["aggregate"] = aggregate return self._make_request("GET", path, params=params) - def get_audit_log(self, org_id: int, **params) -> List[LogEntry]: + def get_audit_log( + self, + org_id: int, + from_date: str = None, + to_date: str = None, + event_filter: str = None, + change_filter: str = None, + page: int = None, + per_page: int = None, + ) -> List[LogEntry]: """ Get audit log for an organization. Args: org_id: Organization ID - **params: Additional query parameters (e.g., page, per_page) + from_date: Start date filter (YYYY-MM-DD) + to_date: End date filter (YYYY-MM-DD) + event_filter: Filter by event type + change_filter: Filter by change type + page: Page number for pagination + per_page: Items per page Returns: List of audit log entries """ path = f"{self._path}/{org_id}/audit_log" + params = {} + if from_date is not None: + params["from"] = from_date + if to_date is not None: + params["to"] = to_date + if event_filter is not None: + params["event_filter"] = event_filter + if change_filter is not None: + params["change_filter"] = change_filter + if page is not None: + params["page"] = page + if per_page is not None: + params["per_page"] = per_page response = self._make_request("GET", path, params=params) return [LogEntry.model_validate(item) for item in response] + def get_flow_status_metrics( + self, org_id: int, from_date: str = None, page: int = None, per_page: int = None + ) -> Dict[str, Any]: + """ + Get flow status metrics for an organization. + + Args: + org_id: Organization ID + from_date: Start date filter (YYYY-MM-DD) + page: Page number for pagination + per_page: Items per page + + Returns: + Flow status metrics + """ + path = f"{self._path}/{org_id}/flows/status_metrics" + params = {} + if from_date is not None: + params["from"] = from_date + if page is not None: + params["page"] = page + if per_page is not None: + params["per_page"] = per_page + return self._make_request("GET", path, params=params) + def get_resource_audit_log( self, org_id: int, resource_type: str, **params ) -> List[LogEntry]: diff --git a/nexla_sdk/resources/users.py b/nexla_sdk/resources/users.py index 1bff4d8..10039a6 100644 --- a/nexla_sdk/resources/users.py +++ b/nexla_sdk/resources/users.py @@ -178,9 +178,45 @@ def delete_quarantine_settings(self, user_id: int) -> Dict[str, Any]: path = f"{self._path}/{user_id}/quarantine_settings" return self._make_request("DELETE", path) - def get_audit_log(self, user_id: int, **params) -> List[Dict[str, Any]]: - """Get audit log for a user.""" + def get_audit_log( + self, + user_id: int, + from_date: str = None, + to_date: str = None, + event_filter: str = None, + change_filter: str = None, + page: int = None, + per_page: int = None, + ) -> List[Dict[str, Any]]: + """ + Get audit log for a user. + + Args: + user_id: User ID + from_date: Start date filter (YYYY-MM-DD) + to_date: End date filter (YYYY-MM-DD) + event_filter: Filter by event type + change_filter: Filter by change type + page: Page number for pagination + per_page: Items per page + + Returns: + List of audit log entries + """ path = f"{self._path}/{user_id}/audit_log" + params = {} + if from_date is not None: + params["from"] = from_date + if to_date is not None: + params["to"] = to_date + if event_filter is not None: + params["event_filter"] = event_filter + if change_filter is not None: + params["change_filter"] = change_filter + if page is not None: + params["page"] = page + if per_page is not None: + params["per_page"] = per_page response = self._make_request("GET", path, params=params) if isinstance(response, list): return response @@ -225,6 +261,7 @@ def get_account_metrics( from_date: str, to_date: Optional[str] = None, org_id: Optional[int] = None, + aggregate: Optional[str] = None, ) -> Dict[str, Any]: """ Get total account metrics for user. @@ -234,6 +271,7 @@ def get_account_metrics( from_date: Start date (YYYY-MM-DD) to_date: End date (optional) org_id: Organization ID (for users in multiple orgs) + aggregate: Aggregation mode (optional) Returns: Account metrics @@ -244,9 +282,36 @@ def get_account_metrics( params["to"] = to_date if org_id: params["org_id"] = org_id + if aggregate: + params["aggregate"] = aggregate return self._make_request("GET", path, params=params) + def get_flow_status_metrics( + self, user_id: int, from_date: str = None, page: int = None, per_page: int = None + ) -> Dict[str, Any]: + """ + Get flow status metrics for a user. + + Args: + user_id: User ID + from_date: Start date filter (YYYY-MM-DD) + page: Page number for pagination + per_page: Items per page + + Returns: + Flow status metrics + """ + path = f"{self._path}/{user_id}/flows/status_metrics" + params = {} + if from_date is not None: + params["from"] = from_date + if page is not None: + params["page"] = page + if per_page is not None: + params["per_page"] = per_page + return self._make_request("GET", path, params=params) + def get_dashboard_metrics( self, user_id: int, access_role: Optional[str] = None ) -> Dict[str, Any]: