Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions cm_python_highlevel_clients/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# High-Level Clients for CleverMaps Data Operations

A Python library providing high-level client interfaces for common CleverMaps operations.

## Overview

This package simplifies interaction with CleverMaps projects by providing intuitive, high-level methods that handle complex workflows behind the scenes. It's built on top of the CleverMaps Python OpenAPI SDK.
## Features

- **Data Dumping**: Export datasets from CleverMaps projects to CSV files
- **Data Loading**: Upload CSV files to CleverMaps projects with automatic handling of:
- Single-part uploads for small files
- Multipart uploads with GZIP compression for large files
- Automatic file splitting while preserving CSV structure

## Installation

```bash
pip install cm-python-openapi-sdk
```
17 changes: 17 additions & 0 deletions cm_python_highlevel_clients/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# clients/__init__.py
"""
High-level clients for working with the API.

This package provides specialized clients that combine multiple API endpoints
into convenient high-level operations.
"""

from .base_api_client import BaseClient
from .data_dump_client import DataDumpClient
from .load_data_client import LoadDataClient

__all__ = [
'BaseClient',
'DataDumpClient',
'LoadDataClient',
]
130 changes: 130 additions & 0 deletions cm_python_highlevel_clients/base_api_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# clients/base_client.py
from typing import Optional
import logging

logger = logging.getLogger(__name__)


class BaseClient:
"""
Base client that provides access to all low-level APIs.
All specialized clients inherit from this.
"""

def __init__(self, api_token: str, host: Optional[str] = None):
"""
Initialize the base client.

Args:
api_token: API access token (required)
host: API host URL (optional, uses default if not provided)
"""
from cm_python_openapi_sdk import ApiClient, Configuration

# Create configuration
config = Configuration()
if host:
config.host = host

# Create API client
self._api_client = ApiClient(configuration=config)
self._api_cache = {}

# Exchange token for bearer token
self.exchange_token(api_token)

@property
def api_client(self):
"""Access to the raw API client."""
return self._api_client

def exchange_token(self, api_token: str) -> str:
"""
Exchange an API token for a bearer token and configure the API client to use it.

Args:
api_token: The API access token to exchange

Returns:
The bearer token string

Raises:
Exception: If token exchange fails
"""
from cm_python_openapi_sdk.models.token_request_dto import TokenRequestDTO

logger.info("Exchanging API token for bearer token")

# Create the token request
token_request = TokenRequestDTO(refresh_token=api_token)

# Get the authentication API and call get_token
auth_api = self._get_api('AuthenticationApi')
token_response = auth_api.get_token(token_request_dto=token_request)

# Extract the bearer token from the response
bearer_token = token_response.access_token

# Configure the API client to use the bearer token
self._api_client.configuration.access_token = bearer_token

logger.info("Successfully exchanged token and configured API client")

return bearer_token

def _get_api(self, api_class_name: str):
"""
Helper to lazily instantiate API classes.

Args:
api_class_name: Name of the API class (e.g., 'JobsApi', 'DataUploadApi')

Returns:
Instance of the requested API class
"""
if api_class_name not in self._api_cache:
# Dynamically import the API class
import importlib
module = importlib.import_module(f'cm_python_openapi_sdk.api.{self._to_snake_case(api_class_name)}')
api_class = getattr(module, api_class_name)
self._api_cache[api_class_name] = api_class(self._api_client)
return self._api_cache[api_class_name]

def _to_snake_case(self, class_name: str) -> str:
"""
Convert PascalCase to snake_case for module names.

Args:
class_name: Class name in PascalCase (e.g., 'JobsApi')

Returns:
Module name in snake_case (e.g., 'jobs_api')
"""
import re
# Insert underscore before uppercase letters and convert to lowercase
snake = re.sub('([A-Z]+)', r'_\1', class_name).lower()
return snake.lstrip('_')

def __getattr__(self, name: str):
"""
Automatically expose all *Api classes from generated SDK.
Usage: client.jobs_api, client.data_upload_api, etc.

Args:
name: Attribute name in snake_case ending with '_api'

Returns:
Instance of the requested API class

Raises:
AttributeError: If the attribute doesn't exist
"""
if name.endswith('_api'):
# Convert snake_case to PascalCase
class_name = ''.join(word.capitalize() for word in name.split('_'))
try:
return self._get_api(class_name)
except (ImportError, AttributeError):
pass

raise AttributeError(f"'{type(self).__name__}' has no attribute '{name}'")
178 changes: 178 additions & 0 deletions cm_python_highlevel_clients/data_dump_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# clients/data_dump_client.py
import time
import os
import logging
from typing import Optional

import requests

from . import BaseClient

logger = logging.getLogger(__name__)


class DataDumpClient(BaseClient):
def dump_dataset_to_csv(
self,
project_id: str,
dataset: str,
output_path: str,
poll_interval: int = 5
) -> str:
"""
Dump a dataset to a CSV file.

This method handles the complete workflow:
1. Submits a data dump job
2. Polls for job completion
3. Downloads the resulting CSV file

Args:
project_id: The ID of the project containing the dataset
dataset: The name of the dataset to dump
output_path: Directory path where to save the CSV file
poll_interval: Seconds between status checks (default: 5)

Returns:
Path to the downloaded CSV file

Raises:
Exception: If the job fails or the download fails
"""
logger.debug(f"Starting data dump for project {project_id}, dataset {dataset}")

# Import models here to avoid circular dependencies
from cm_python_openapi_sdk.models.data_dump_job_request import DataDumpJobRequest
from cm_python_openapi_sdk.models.data_dump_request import DataDumpRequest
from cm_python_openapi_sdk.models.general_job_request import GeneralJobRequest

# Create an output directory if it doesn't exist
os.makedirs(output_path, exist_ok=True)
logger.debug(f"Ensured output directory exists: {output_path}")

# Prepare the job request
data_dump_request = DataDumpRequest(
dataset=dataset
)

job_request = DataDumpJobRequest(
type="dataDump",
projectId=project_id,
content=data_dump_request
)

submit_request = GeneralJobRequest(actual_instance=job_request)

# Submit the job
logger.debug("Submitting data dump job")
job_response = self.jobs_api.submit_job_execution(submit_request)
job_id = job_response.id
logger.debug(f"Job submitted with ID: {job_id}")

# Poll for job completion
logger.debug("Polling for job completion")
self._wait_for_job_completion(job_id, poll_interval)

# Get the result file URL and download
job_status = self.jobs_api.get_job_status(job_id, type="dataDump")
result_url = job_status.result.get("links")[0].get("href")

if not result_url:
error_msg = "No result file URL found in job response"
logger.error(error_msg)
raise Exception(error_msg)

logger.debug(f"Got result file URL: {result_url}")

# Download the file
local_filename = os.path.join(output_path, f"{dataset}.csv")
self._download_file(result_url, local_filename)

logger.debug(f"File downloaded successfully: {local_filename}")
return local_filename

def _wait_for_job_completion(
self,
job_id: str,
poll_interval: int = 5,
timeout: Optional[int] = None
) -> None:
"""
Poll for job completion until it succeeds or fails.

Args:
job_id: The ID of the job to monitor
poll_interval: Seconds between status checks
timeout: Maximum seconds to wait (None for no timeout)

Raises:
Exception: If the job fails
TimeoutError: If timeout is reached
"""
start_time = time.time()

while True:
job_status = self.jobs_api.get_job_status(job_id, type="dataDump")
logger.debug(f"Current job status: {job_status.status}")

if job_status.status == "SUCCEEDED":
logger.debug("Job completed successfully")
return
elif job_status.status == "FAILED":
error_msg = f"Job failed: {job_status.error}"
logger.error(error_msg)
raise Exception(error_msg)

# Check timeout
if timeout and (time.time() - start_time) > timeout:
raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds")

time.sleep(poll_interval)

def _download_file(self, result_url: str, local_filename: str) -> None:
"""
Download a file from the API.

Args:
result_url: Relative URL path to the file
local_filename: Local path where to save the file

Raises:
Exception: If the download fails
"""
logger.debug(f"Downloading file to: {local_filename}")

# Get the full URL by combining the base URL with the relative path
# Avoid duplicating /rest if result_url already contains it
full_url = self.api_client.configuration.host + result_url[5:] # Remove /rest from result_url
logger.debug(f"Full URL: {full_url}")

try:
# Get the authorization header from the API client configuration
headers = {
'Authorization': f"Bearer {self.api_client.configuration.access_token}"
}
response = requests.get(full_url, headers=headers)
response.raise_for_status()

with open(local_filename, 'wb') as f:
f.write(response.content)

logger.debug(f"File downloaded successfully: {local_filename}")

except Exception as e:
error_msg = f"Failed to download file: {str(e)}"
logger.error(error_msg)
raise Exception(error_msg)

def get_job_status(self, job_id: str):
"""
Get the current status of a data dump job.

Args:
job_id: The ID of the job

Returns:
Job status response
"""
return self.jobs_api.get_job_status(job_id, type="dataDump")
Loading