From f3d887b4d7f1c8f79f594adc9b7ab3b73272f81b Mon Sep 17 00:00:00 2001 From: Daniel Valansky Date: Thu, 23 Oct 2025 16:05:54 +0200 Subject: [PATCH 1/2] Added highlevel clients for loading and dumping datasets --- cm_python_highlevel_clients/README.md | 20 + cm_python_highlevel_clients/__init__.py | 17 + .../base_api_client.py | 130 +++++++ .../data_dump_client.py | 178 +++++++++ .../load_data_client.py | 358 ++++++++++++++++++ cm_python_highlevel_clients/utils/__init__.py | 9 + .../utils/csv_file_splitter.py | 110 ++++++ requirements.txt | 1 + test/highlevel_clients/__init__.py | 0 test/highlevel_clients/test_data_dump.py | 35 ++ test/highlevel_clients/test_data_load.py | 75 ++++ 11 files changed, 933 insertions(+) create mode 100644 cm_python_highlevel_clients/README.md create mode 100644 cm_python_highlevel_clients/__init__.py create mode 100644 cm_python_highlevel_clients/base_api_client.py create mode 100644 cm_python_highlevel_clients/data_dump_client.py create mode 100644 cm_python_highlevel_clients/load_data_client.py create mode 100644 cm_python_highlevel_clients/utils/__init__.py create mode 100644 cm_python_highlevel_clients/utils/csv_file_splitter.py create mode 100644 test/highlevel_clients/__init__.py create mode 100644 test/highlevel_clients/test_data_dump.py create mode 100644 test/highlevel_clients/test_data_load.py diff --git a/cm_python_highlevel_clients/README.md b/cm_python_highlevel_clients/README.md new file mode 100644 index 0000000..6686ba2 --- /dev/null +++ b/cm_python_highlevel_clients/README.md @@ -0,0 +1,20 @@ +# High-Level Clients for CleverMaps Data Operations + +A Python library providing high-level client interfaces for common CleverMaps operations. + +## Overview + +This package simplifies interaction with CleverMaps projects by providing intuitive, high-level methods that handle complex workflows behind the scenes. It's built on top of the CleverMaps Python OpenAPI SDK. +## Features + +- **Data Dumping**: Export datasets from CleverMaps projects to CSV files +- **Data Loading**: Upload CSV files to CleverMaps projects with automatic handling of: + - Single-part uploads for small files + - Multipart uploads with GZIP compression for large files + - Automatic file splitting while preserving CSV structure + +## Installation + +```bash +pip install cm-python-openapi-sdk +``` diff --git a/cm_python_highlevel_clients/__init__.py b/cm_python_highlevel_clients/__init__.py new file mode 100644 index 0000000..61ecec9 --- /dev/null +++ b/cm_python_highlevel_clients/__init__.py @@ -0,0 +1,17 @@ +# clients/__init__.py +""" +High-level clients for working with the API. + +This package provides specialized clients that combine multiple API endpoints +into convenient high-level operations. +""" + +from .base_api_client import BaseClient +from .data_dump_client import DataDumpClient +from .load_data_client import LoadDataClient + +__all__ = [ + 'BaseClient', + 'DataDumpClient', + 'LoadDataClient', +] \ No newline at end of file diff --git a/cm_python_highlevel_clients/base_api_client.py b/cm_python_highlevel_clients/base_api_client.py new file mode 100644 index 0000000..9c76509 --- /dev/null +++ b/cm_python_highlevel_clients/base_api_client.py @@ -0,0 +1,130 @@ +# clients/base_client.py +from typing import Optional +import logging + +logger = logging.getLogger(__name__) + + +class BaseClient: + """ + Base client that provides access to all low-level APIs. + All specialized clients inherit from this. + """ + + def __init__(self, api_token: str, host: Optional[str] = None): + """ + Initialize the base client. + + Args: + api_token: API access token (required) + host: API host URL (optional, uses default if not provided) + """ + from cm_python_openapi_sdk import ApiClient, Configuration + + # Create configuration + config = Configuration() + if host: + config.host = host + + # Create API client + self._api_client = ApiClient(configuration=config) + self._api_cache = {} + + # Exchange token for bearer token + self.exchange_token(api_token) + + @property + def api_client(self): + """Access to the raw API client.""" + return self._api_client + + def exchange_token(self, api_token: str) -> str: + """ + Exchange an API token for a bearer token and configure the API client to use it. + + Args: + api_token: The API access token to exchange + + Returns: + The bearer token string + + Raises: + Exception: If token exchange fails + """ + from cm_python_openapi_sdk.models.token_request_dto import TokenRequestDTO + + logger.info("Exchanging API token for bearer token") + + # Create the token request + token_request = TokenRequestDTO(refresh_token=api_token) + + # Get the authentication API and call get_token + auth_api = self._get_api('AuthenticationApi') + token_response = auth_api.get_token(token_request_dto=token_request) + + # Extract the bearer token from the response + bearer_token = token_response.access_token + + # Configure the API client to use the bearer token + self._api_client.configuration.access_token = bearer_token + + logger.info("Successfully exchanged token and configured API client") + + return bearer_token + + def _get_api(self, api_class_name: str): + """ + Helper to lazily instantiate API classes. + + Args: + api_class_name: Name of the API class (e.g., 'JobsApi', 'DataUploadApi') + + Returns: + Instance of the requested API class + """ + if api_class_name not in self._api_cache: + # Dynamically import the API class + import importlib + module = importlib.import_module(f'cm_python_openapi_sdk.api.{self._to_snake_case(api_class_name)}') + api_class = getattr(module, api_class_name) + self._api_cache[api_class_name] = api_class(self._api_client) + return self._api_cache[api_class_name] + + def _to_snake_case(self, class_name: str) -> str: + """ + Convert PascalCase to snake_case for module names. + + Args: + class_name: Class name in PascalCase (e.g., 'JobsApi') + + Returns: + Module name in snake_case (e.g., 'jobs_api') + """ + import re + # Insert underscore before uppercase letters and convert to lowercase + snake = re.sub('([A-Z]+)', r'_\1', class_name).lower() + return snake.lstrip('_') + + def __getattr__(self, name: str): + """ + Automatically expose all *Api classes from generated SDK. + Usage: client.jobs_api, client.data_upload_api, etc. + + Args: + name: Attribute name in snake_case ending with '_api' + + Returns: + Instance of the requested API class + + Raises: + AttributeError: If the attribute doesn't exist + """ + if name.endswith('_api'): + # Convert snake_case to PascalCase + class_name = ''.join(word.capitalize() for word in name.split('_')) + try: + return self._get_api(class_name) + except (ImportError, AttributeError): + pass + + raise AttributeError(f"'{type(self).__name__}' has no attribute '{name}'") diff --git a/cm_python_highlevel_clients/data_dump_client.py b/cm_python_highlevel_clients/data_dump_client.py new file mode 100644 index 0000000..52dcdf7 --- /dev/null +++ b/cm_python_highlevel_clients/data_dump_client.py @@ -0,0 +1,178 @@ +# clients/data_dump_client.py +import time +import os +import logging +from typing import Optional + +import requests + +from . import BaseClient + +logger = logging.getLogger(__name__) + + +class DataDumpClient(BaseClient): + def dump_dataset_to_csv( + self, + project_id: str, + dataset: str, + output_path: str, + poll_interval: int = 5 + ) -> str: + """ + Dump a dataset to a CSV file. + + This method handles the complete workflow: + 1. Submits a data dump job + 2. Polls for job completion + 3. Downloads the resulting CSV file + + Args: + project_id: The ID of the project containing the dataset + dataset: The name of the dataset to dump + output_path: Directory path where to save the CSV file + poll_interval: Seconds between status checks (default: 5) + + Returns: + Path to the downloaded CSV file + + Raises: + Exception: If the job fails or the download fails + """ + logger.debug(f"Starting data dump for project {project_id}, dataset {dataset}") + + # Import models here to avoid circular dependencies + from cm_python_openapi_sdk.models.data_dump_job_request import DataDumpJobRequest + from cm_python_openapi_sdk.models.data_dump_request import DataDumpRequest + from cm_python_openapi_sdk.models.general_job_request import GeneralJobRequest + + # Create an output directory if it doesn't exist + os.makedirs(output_path, exist_ok=True) + logger.debug(f"Ensured output directory exists: {output_path}") + + # Prepare the job request + data_dump_request = DataDumpRequest( + dataset=dataset + ) + + job_request = DataDumpJobRequest( + type="dataDump", + projectId=project_id, + content=data_dump_request + ) + + submit_request = GeneralJobRequest(actual_instance=job_request) + + # Submit the job + logger.debug("Submitting data dump job") + job_response = self.jobs_api.submit_job_execution(submit_request) + job_id = job_response.id + logger.debug(f"Job submitted with ID: {job_id}") + + # Poll for job completion + logger.debug("Polling for job completion") + self._wait_for_job_completion(job_id, poll_interval) + + # Get the result file URL and download + job_status = self.jobs_api.get_job_status(job_id, type="dataDump") + result_url = job_status.result.get("links")[0].get("href") + + if not result_url: + error_msg = "No result file URL found in job response" + logger.error(error_msg) + raise Exception(error_msg) + + logger.debug(f"Got result file URL: {result_url}") + + # Download the file + local_filename = os.path.join(output_path, f"{dataset}.csv") + self._download_file(result_url, local_filename) + + logger.debug(f"File downloaded successfully: {local_filename}") + return local_filename + + def _wait_for_job_completion( + self, + job_id: str, + poll_interval: int = 5, + timeout: Optional[int] = None + ) -> None: + """ + Poll for job completion until it succeeds or fails. + + Args: + job_id: The ID of the job to monitor + poll_interval: Seconds between status checks + timeout: Maximum seconds to wait (None for no timeout) + + Raises: + Exception: If the job fails + TimeoutError: If timeout is reached + """ + start_time = time.time() + + while True: + job_status = self.jobs_api.get_job_status(job_id, type="dataDump") + logger.debug(f"Current job status: {job_status.status}") + + if job_status.status == "SUCCEEDED": + logger.debug("Job completed successfully") + return + elif job_status.status == "FAILED": + error_msg = f"Job failed: {job_status.error}" + logger.error(error_msg) + raise Exception(error_msg) + + # Check timeout + if timeout and (time.time() - start_time) > timeout: + raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds") + + time.sleep(poll_interval) + + def _download_file(self, result_url: str, local_filename: str) -> None: + """ + Download a file from the API. + + Args: + result_url: Relative URL path to the file + local_filename: Local path where to save the file + + Raises: + Exception: If the download fails + """ + logger.debug(f"Downloading file to: {local_filename}") + + # Get the full URL by combining the base URL with the relative path + # Avoid duplicating /rest if result_url already contains it + full_url = self.api_client.configuration.host + result_url[5:] # Remove /rest from result_url + logger.debug(f"Full URL: {full_url}") + + try: + # Get the authorization header from the API client configuration + headers = { + 'Authorization': f"Bearer {self.api_client.configuration.access_token}" + } + response = requests.get(full_url, headers=headers) + response.raise_for_status() + + with open(local_filename, 'wb') as f: + f.write(response.content) + + logger.debug(f"File downloaded successfully: {local_filename}") + + except Exception as e: + error_msg = f"Failed to download file: {str(e)}" + logger.error(error_msg) + raise Exception(error_msg) + + def get_job_status(self, job_id: str): + """ + Get the current status of a data dump job. + + Args: + job_id: The ID of the job + + Returns: + Job status response + """ + return self.jobs_api.get_job_status(job_id, type="dataDump") diff --git a/cm_python_highlevel_clients/load_data_client.py b/cm_python_highlevel_clients/load_data_client.py new file mode 100644 index 0000000..3681d31 --- /dev/null +++ b/cm_python_highlevel_clients/load_data_client.py @@ -0,0 +1,358 @@ +# clients/load_data_client.py +import os +import requests +import logging +import time +import gzip +from typing import Optional +from . import BaseClient + +logger = logging.getLogger(__name__) + + +class LoadDataClient(BaseClient): + """ + High-level client for data loading operations. + + Provides convenient methods for: + - Uploading CSV files (single-part and multipart) + - Automatic file splitting for large files + - Job monitoring and status tracking + """ + + def __init__(self, api_token: str, host: Optional[str] = None, chunk_size: int = 20 * 1024 * 1024): + """ + Initialize the LoadData client. + + Args: + api_token: API access token (required) + host: API host URL (optional, uses default if not provided) + chunk_size: Size in bytes for multipart upload chunks (default: 50MB) + """ + super().__init__(api_token, host) + self.chunk_size = chunk_size + self.target_part_size = 20 * 1024 * 1024 # 20MB target part size + self.content_type = 'text/csv; charset=utf-8' + + def upload_data( + self, + project_id: str, + csv_file_path: str, + dataset_name: str, + poll_job: bool = True, + poll_interval: int = 5 + ): + """ + Upload a CSV file to the platform. + + Automatically chooses between single-part and multipart upload + based on file size. + + Args: + project_id: The ID of the project to upload to + csv_file_path: Path to the CSV file + dataset_name: Name of the dataset + poll_job: Whether to wait for job completion (default: True) + poll_interval: Seconds between status checks if polling (default: 5) + + Returns: + Job response object + + Raises: + FileNotFoundError: If the CSV file doesn't exist + Exception: For API-related errors + """ + if not os.path.exists(csv_file_path): + raise FileNotFoundError(f"CSV file not found: {csv_file_path}") + + file_size = os.path.getsize(csv_file_path) + logger.debug(f"File size: {file_size} bytes") + + # Choose upload method based on file size + if file_size <= self.chunk_size: + logger.debug("Using single-part upload") + job_response = self._single_part_upload(project_id, csv_file_path, dataset_name) + else: + # Calculate number of parts based on target part size + parts = (file_size + self.target_part_size - 1) // self.target_part_size + logger.debug(f"Using multipart upload with {parts} parts") + job_response = self._multipart_upload(project_id, csv_file_path, parts, dataset_name) + + # Poll for job completion if requested + if poll_job: + success = self.poll_job_status(job_response.id, poll_interval) + if not success: + raise Exception(f"Job {job_response.id} failed") + + return job_response + + def _single_part_upload(self, project_id: str, csv_file_path: str, dataset_name: str): + """ + Handle single-part upload for smaller files. + + Args: + project_id: Project ID + csv_file_path: Path to CSV file + dataset_name: Dataset name + + Returns: + Job response + """ + from cm_python_openapi_sdk.models.data_pull_job_request import DataPullJobRequest + from cm_python_openapi_sdk.models.data_pull_request import DataPullRequest + from cm_python_openapi_sdk.models.general_job_request import GeneralJobRequest + from cm_python_openapi_sdk.models.data_pull_request_csv_options import DataPullRequestCsvOptions + + # Initialize upload + logger.debug(f"Initializing upload for project {project_id}") + upload_response = self.data_upload_api.data_upload(project_id=project_id) + logger.debug(f"Upload response received") + + # Get the upload URL + upload_url = upload_response.actual_instance.upload_url_encoded + logger.debug(f"Got upload URL") + + # Upload the file + self._upload_file(upload_url, csv_file_path) + + # Create and submit data pull job + data_pull_request = DataPullRequest( + dataset=dataset_name, + mode="full", + type="csv", + upload=upload_response.actual_instance.links[1]["href"], + csvOptions=DataPullRequestCsvOptions() + ) + + job_request = DataPullJobRequest( + type="dataPull", + projectId=project_id, + content=data_pull_request + ) + + submit_request = GeneralJobRequest(actual_instance=job_request) + job_response = self.jobs_api.submit_job_execution(submit_request) + logger.debug(f"Job submitted with ID: {job_response.id}") + + return job_response + + def _multipart_upload( + self, + project_id: str, + csv_file_path: str, + parts: int, + dataset_name: str + ): + """ + Handle multipart upload for larger files. + + Args: + project_id: Project ID + csv_file_path: Path to CSV file + parts: Number of parts to split into + dataset_name: Dataset name + + Returns: + Job response + """ + from cm_python_openapi_sdk.models.data_pull_job_request import DataPullJobRequest + from cm_python_openapi_sdk.models.data_pull_request import DataPullRequest + from cm_python_openapi_sdk.models.general_job_request import GeneralJobRequest + from cm_python_openapi_sdk.models.data_pull_request_csv_options import DataPullRequestCsvOptions + from cm_python_openapi_sdk.models.data_complete_multipart_upload_request import DataCompleteMultipartUploadRequest + from utils.csv_file_splitter import CSVFileSplitter + + # Initialize multipart upload + upload_response = self.data_upload_api.data_upload(project_id=project_id, parts=parts) + + # Get the upload URLs + upload_urls = upload_response.actual_instance.upload_urls_encoded + + uploaded_parts = [] + + # Use CSVFileSplitter to split and upload + file_splitter = CSVFileSplitter(self.chunk_size) + with file_splitter as splitter: + for temp_file_path, part_number in splitter.split_file(csv_file_path, parts): + try: + logger.debug(f"Uploading part {part_number}/{parts}") + etag = self._upload_part(upload_urls[part_number - 1], temp_file_path) + uploaded_parts.append({ + "eTag": etag, + "partNumber": part_number + }) + except Exception as e: + logger.error(f"Failed to upload part {part_number}: {e}") + raise + + # Complete the multipart upload + complete_request = DataCompleteMultipartUploadRequest( + id=upload_response.actual_instance.id, + uploadId=upload_response.actual_instance.upload_id, + partETags=uploaded_parts + ) + + complete_response = self.data_upload_api.complete_multipart_upload( + project_id=project_id, + id=upload_response.actual_instance.id, + data_complete_multipart_upload_request=complete_request + ) + + # Create and submit data pull job + data_pull_request = DataPullRequest( + dataset=dataset_name, + mode="full", + type="csv", + upload=upload_response.actual_instance.links[1]["href"], + csvOptions=DataPullRequestCsvOptions() + ) + + job_request = DataPullJobRequest( + type="dataPull", + projectId=project_id, + content=data_pull_request + ) + + submit_request = GeneralJobRequest(actual_instance=job_request) + job_response = self.jobs_api.submit_job_execution(submit_request) + logger.debug(f"Job submitted with ID: {job_response.id}") + + return job_response + + def _upload_file(self, url: str, csv_file_path: str) -> None: + """ + Upload a file to a presigned URL. + + Args: + url: Presigned URL + csv_file_path: Path to the file to upload + + Raises: + requests.exceptions.RequestException: If upload fails + """ + logger.debug(f"Uploading file to presigned URL") + + try: + with open(csv_file_path, 'rb') as f: + file_size = os.path.getsize(csv_file_path) + logger.debug(f"File size: {file_size} bytes") + + headers = { + 'Content-Type': self.content_type, + 'Content-Length': str(file_size) + } + + response = requests.put(url, data=f, headers=headers) + + if response.status_code != 200: + logger.error(f"Upload failed with status {response.status_code}") + logger.error(f"Response content: {response.text}") + + response.raise_for_status() + + except Exception as e: + logger.error(f"Error during file upload: {str(e)}") + raise + + def _upload_part(self, url: str, file_path: str) -> str: + """ + Upload a GZIP compressed part to presigned URL. + + Args: + url: Presigned URL + file_path: Path to the file to upload + + Returns: + ETag from the response + + Raises: + requests.exceptions.RequestException: If upload fails + """ + logger.debug(f"Uploading GZIP compressed part") + + try: + # Read and compress the file content + with open(file_path, 'rb') as f: + file_content = f.read() + + compressed_content = gzip.compress(file_content) + compressed_size = len(compressed_content) + logger.debug( + f"Original size: {len(file_content)} bytes, " + f"Compressed size: {compressed_size} bytes" + ) + + headers = { + 'Content-Type': self.content_type, + 'Content-Length': str(compressed_size) + } + + response = requests.put(url, data=compressed_content, headers=headers) + + if response.status_code != 200: + logger.error(f"Upload failed with status {response.status_code}") + logger.error(f"Response content: {response.text}") + + response.raise_for_status() + + # Get ETag from response + etag = response.headers.get('ETag') + if not etag: + raise ValueError("No ETag received in response") + + return etag + + except Exception as e: + logger.error(f"Error during part upload: {str(e)}") + raise + + def poll_job_status(self, job_id: str, poll_interval: int = 5, timeout: Optional[int] = None) -> bool: + """ + Poll the status of a job until it completes or fails. + + Args: + job_id: The ID of the job to poll + poll_interval: Seconds between status checks + timeout: Maximum seconds to wait (None for no timeout) + + Returns: + True if job succeeded, False if it failed + + Raises: + TimeoutError: If timeout is reached + """ + start_time = time.time() + + while True: + try: + job_status = self.jobs_api.get_job_status(job_id, "dataPull") + logger.debug(f"Job status: {job_status.status}") + + if job_status.status == "SUCCEEDED": + logger.info("Job completed successfully") + return True + elif job_status.status == "FAILED": + logger.error(f"Job failed: {job_status.message}") + return False + + # Check timeout + if timeout and (time.time() - start_time) > timeout: + raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds") + + time.sleep(poll_interval) + + except Exception as e: + logger.error(f"Error polling job status: {str(e)}") + raise + + def get_job_status(self, job_id: str): + """ + Get the current status of a data pull job. + + Args: + job_id: The ID of the job + + Returns: + Job status response + """ + return self.jobs_api.get_job_status(job_id, type="dataPull") diff --git a/cm_python_highlevel_clients/utils/__init__.py b/cm_python_highlevel_clients/utils/__init__.py new file mode 100644 index 0000000..65fe4a9 --- /dev/null +++ b/cm_python_highlevel_clients/utils/__init__.py @@ -0,0 +1,9 @@ +from .base_client import BaseClient +from .data_dump_client import DataDumpClient +from .load_data_client import LoadDataClient + +__all__ = [ + 'BaseClient', + 'DataDumpClient', + 'LoadDataClient', +] \ No newline at end of file diff --git a/cm_python_highlevel_clients/utils/csv_file_splitter.py b/cm_python_highlevel_clients/utils/csv_file_splitter.py new file mode 100644 index 0000000..d8a5b39 --- /dev/null +++ b/cm_python_highlevel_clients/utils/csv_file_splitter.py @@ -0,0 +1,110 @@ +import logging +import os +import tempfile +from typing import Tuple, Iterator + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + + +class CSVFileSplitter: + def __init__(self, chunk_size: int): + """Initialize the CSV file splitter. + + Args: + chunk_size (int): Target size for each chunk in bytes + """ + self.chunk_size = chunk_size + self.temp_dir = None + + def __enter__(self): + """Create temporary directory when entering context.""" + self.temp_dir = tempfile.mkdtemp(prefix='csv_split_') + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Clean up temporary directory when exiting context.""" + if self.temp_dir and os.path.exists(self.temp_dir): + for file in os.listdir(self.temp_dir): + try: + os.remove(os.path.join(self.temp_dir, file)) + except Exception as e: + logger.warning(f"Failed to remove temporary file {file}: {e}") + try: + os.rmdir(self.temp_dir) + except Exception as e: + logger.warning(f"Failed to remove temporary directory: {e}") + + def split_file(self, file_path: str, num_parts: int) -> Iterator[Tuple[str, int]]: + """Split a CSV file into parts while preserving CSV structure. + Each part is saved as a temporary file. + + Args: + file_path (str): Path to the CSV file + num_parts (int): Number of parts to split into + + Yields: + Tuple[str, int]: Tuple of (temp_file_path, part_number) + """ + if not self.temp_dir: + raise RuntimeError("CSVFileSplitter must be used as a context manager") + + current_part = 1 + current_size = 0 + current_file = None + current_writer = None + + try: + with open(file_path, 'r', newline='') as f: + # Read header + header = next(f) + header_size = len(header.encode('utf-8')) + + # Create first part file + current_file = self._create_temp_file(current_part) + current_writer = open(current_file, 'w', newline='') + current_writer.write(header) + current_size = header_size + + # Read rows + for row in f: + row_size = len(row.encode('utf-8')) + + # If adding this row would exceed chunk size and we're not on the last part, + # close current file and start a new one + if current_size + row_size > self.chunk_size and current_part < num_parts: + current_writer.close() + yield current_file, current_part + + current_part += 1 + current_file = self._create_temp_file(current_part) + current_writer = open(current_file, 'w', newline='') + current_size = header_size + + current_writer.write(row) + current_size += row_size + + # Close the last file if it exists + if current_writer: + current_writer.close() + yield current_file, current_part + + except Exception as e: + # Clean up in case of error + if current_writer: + current_writer.close() + if current_file and os.path.exists(current_file): + os.remove(current_file) + raise e + + def _create_temp_file(self, part_number: int) -> str: + """Create a temporary file for a part. + + Args: + part_number (int): The part number + + Returns: + str: Path to the created temporary file + """ + temp_file = os.path.join(self.temp_dir, f'part_{part_number}.csv') + return temp_file \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 6cbb2b9..49fe93f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ +requests urllib3 >= 2.1.0, < 3.0.0 python_dateutil >= 2.8.2 pydantic >= 2 diff --git a/test/highlevel_clients/__init__.py b/test/highlevel_clients/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/highlevel_clients/test_data_dump.py b/test/highlevel_clients/test_data_dump.py new file mode 100644 index 0000000..f83989b --- /dev/null +++ b/test/highlevel_clients/test_data_dump.py @@ -0,0 +1,35 @@ +from cm_python_highlevel_clients import DataDumpClient + + +def test_real_dump_dataset_to_csv(project_id: str, dataset: str, output_path: str, access_token: str): + """ + Test real data dump from CleverMaps project + + Args: + project_id (str): CleverMaps project ID + dataset (str): Dataset name to dump + output_path (str): Path where to save the CSV file + access_token (str): CleverMaps access token + """ + sdk = DataDumpClient(api_token=access_token) + + output_file = sdk.dump_dataset_to_csv(project_id, dataset, output_path) + + print(f"Data dump completed:") + print(f"Output file: {output_file}") + + with open(output_file, 'r') as f: + content = f.read() + print(f"File content length: {len(content)}") + + +if __name__ == "__main__": + # Replace these with your actual values + PROJECT_ID = "" # Replace with your CleverMaps project ID + ACCESS_TOKEN = "" + DATASET = "" # Replace with your dataset name + OUTPUT_PATH = "" # Replace with your desired output path + + # Test real data dump + print("\nTesting real data dump:") + test_real_dump_dataset_to_csv(PROJECT_ID, DATASET, OUTPUT_PATH, ACCESS_TOKEN) \ No newline at end of file diff --git a/test/highlevel_clients/test_data_load.py b/test/highlevel_clients/test_data_load.py new file mode 100644 index 0000000..5cae40a --- /dev/null +++ b/test/highlevel_clients/test_data_load.py @@ -0,0 +1,75 @@ +from cm_python_highlevel_clients.load_data_client import LoadDataClient + + +def test_single_part_upload(csv_path: str, project_id: str, access_token: str): + """ + Test single-part CSV upload (for files <= 50MB) + + Args: + csv_path (str): Path to the CSV file + project_id (str): CleverMaps project ID + config: Configured API client configuration + """ + + load_data_client = LoadDataClient(access_token) + + response = load_data_client.upload_data( + project_id=project_id, + csv_file_path=csv_path, + dataset_name="stores" + ) + + print(f"Single-part upload completed:") + print(f"Job ID: {response.id}") + print(f"Job Status: {response.status}") + + success = load_data_client.poll_job_status(response.id, poll_interval=5) + + if success: + print(f"Job {response.id} completed successfully") + else: + print(f"Job {response.id} failed") + + +def test_multipart_upload(csv_path: str, project_id: str, access_token: str): + """ + Test multipart CSV upload (for files > 50MB) + + Args: + csv_path (str): Path to the CSV file + project_id (str): CleverMaps project ID + config: Configured API client configuration + """ + load_data_client = LoadDataClient(access_token) + + response = load_data_client.upload_data( + project_id=project_id, + csv_file_path=csv_path, + dataset_name="zsj_d_dwh" + ) + + print(f"Multipart upload completed:") + print(f"Job ID: {response.id}") + print(f"Job Status: {response.status}") + + success = load_data_client.poll_job_status(response.id, poll_interval=5) + + if success: + print(f"Job {response.id} completed successfully") + else: + print(f"Job {response.id} failed") + + +if __name__ == "__main__": + PROJECT_ID = "" # Replace with your CleverMaps project ID + ACCESS_TOKEN = "" + DATASET = "" # Replace with your dataset name + CSV_PATH = "" # Replace with your desired output path + CSV_PATH_MULTIPART = "" + # Test single-part upload + print("\nTesting single-part upload:") + test_single_part_upload(CSV_PATH, PROJECT_ID, ACCESS_TOKEN) + + # Test multipart upload + print("\nTesting multipart upload:") + test_multipart_upload(CSV_PATH_MULTIPART, PROJECT_ID, ACCESS_TOKEN) \ No newline at end of file From 69ebac0980bde36a016d3932db25eb1c08d51f2e Mon Sep 17 00:00:00 2001 From: Daniel Valansky Date: Thu, 23 Oct 2025 16:11:20 +0200 Subject: [PATCH 2/2] Disabled tests for high-level clients --- test/highlevel_clients/test_data_dump.py | 23 ++++++++++-------- test/highlevel_clients/test_data_load.py | 31 +++++++++++++----------- 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/test/highlevel_clients/test_data_dump.py b/test/highlevel_clients/test_data_dump.py index f83989b..2392b88 100644 --- a/test/highlevel_clients/test_data_dump.py +++ b/test/highlevel_clients/test_data_dump.py @@ -1,6 +1,8 @@ -from cm_python_highlevel_clients import DataDumpClient +import pytest +from cm_python_highlevel_clients import DataDumpClient +@pytest.mark.skip(reason="Integration test - run manually with actual credentials") def test_real_dump_dataset_to_csv(project_id: str, dataset: str, output_path: str, access_token: str): """ Test real data dump from CleverMaps project @@ -24,12 +26,13 @@ def test_real_dump_dataset_to_csv(project_id: str, dataset: str, output_path: st if __name__ == "__main__": - # Replace these with your actual values - PROJECT_ID = "" # Replace with your CleverMaps project ID - ACCESS_TOKEN = "" - DATASET = "" # Replace with your dataset name - OUTPUT_PATH = "" # Replace with your desired output path - - # Test real data dump - print("\nTesting real data dump:") - test_real_dump_dataset_to_csv(PROJECT_ID, DATASET, OUTPUT_PATH, ACCESS_TOKEN) \ No newline at end of file + # # Replace these with your actual values + # PROJECT_ID = "" # Replace with your CleverMaps project ID + # ACCESS_TOKEN = "" + # DATASET = "" # Replace with your dataset name + # OUTPUT_PATH = "" # Replace with your desired output path + # + # # Test real data dump + # print("\nTesting real data dump:") + # #test_real_dump_dataset_to_csv(PROJECT_ID, DATASET, OUTPUT_PATH, ACCESS_TOKEN) + pass \ No newline at end of file diff --git a/test/highlevel_clients/test_data_load.py b/test/highlevel_clients/test_data_load.py index 5cae40a..d7b46da 100644 --- a/test/highlevel_clients/test_data_load.py +++ b/test/highlevel_clients/test_data_load.py @@ -1,6 +1,8 @@ -from cm_python_highlevel_clients.load_data_client import LoadDataClient +import pytest +from cm_python_highlevel_clients.load_data_client import LoadDataClient +@pytest.mark.skip(reason="Integration test - run manually with actual credentials") def test_single_part_upload(csv_path: str, project_id: str, access_token: str): """ Test single-part CSV upload (for files <= 50MB) @@ -30,7 +32,7 @@ def test_single_part_upload(csv_path: str, project_id: str, access_token: str): else: print(f"Job {response.id} failed") - +@pytest.mark.skip(reason="Integration test - run manually with actual credentials") def test_multipart_upload(csv_path: str, project_id: str, access_token: str): """ Test multipart CSV upload (for files > 50MB) @@ -61,15 +63,16 @@ def test_multipart_upload(csv_path: str, project_id: str, access_token: str): if __name__ == "__main__": - PROJECT_ID = "" # Replace with your CleverMaps project ID - ACCESS_TOKEN = "" - DATASET = "" # Replace with your dataset name - CSV_PATH = "" # Replace with your desired output path - CSV_PATH_MULTIPART = "" - # Test single-part upload - print("\nTesting single-part upload:") - test_single_part_upload(CSV_PATH, PROJECT_ID, ACCESS_TOKEN) - - # Test multipart upload - print("\nTesting multipart upload:") - test_multipart_upload(CSV_PATH_MULTIPART, PROJECT_ID, ACCESS_TOKEN) \ No newline at end of file + # PROJECT_ID = "" # Replace with your CleverMaps project ID + # ACCESS_TOKEN = "" + # DATASET = "" # Replace with your dataset name + # CSV_PATH = "" # Replace with your desired output path + # CSV_PATH_MULTIPART = "" + # # Test single-part upload + # print("\nTesting single-part upload:") + # #test_single_part_upload(CSV_PATH, PROJECT_ID, ACCESS_TOKEN) + # + # # Test multipart upload + # print("\nTesting multipart upload:") + # #test_multipart_upload(CSV_PATH_MULTIPART, PROJECT_ID, ACCESS_TOKEN) + pass \ No newline at end of file