diff --git a/nexla_sdk/resources/flows.py b/nexla_sdk/resources/flows.py index 4576191..43160a1 100644 --- a/nexla_sdk/resources/flows.py +++ b/nexla_sdk/resources/flows.py @@ -1,12 +1,15 @@ from typing import Any, Dict, List, Optional, Union +from nexla_sdk.models.destinations.requests import DestinationUpdate from nexla_sdk.models.flows.requests import FlowCopyOptions +from nexla_sdk.models.projects.requests import ProjectFlowList from nexla_sdk.models.flows.responses import ( DocsRecommendation, FlowLogsResponse, FlowMetricsApiResponse, FlowResponse, ) +from nexla_sdk.models.sources.requests import SourceUpdate from nexla_sdk.resources.base_resource import BaseResource @@ -168,6 +171,102 @@ def copy( """ return super().copy(flow_id, options) + def copy_and_replace_credentials( + self, + flow_id: int, + resource_credential_mapping: Dict[int, int], + copy_options: Optional[FlowCopyOptions] = None, + target_project_id: Optional[int] = None, + ) -> FlowResponse: + """Copy a flow and replace credentials on specified resources. + + This convenience method copies a flow with ``reuse_data_credentials=True`` + so that all copied resources initially keep their original credentials, + then updates the credentials for the resources listed in the mapping. + + The mapping keys are **original** resource IDs (source or sink IDs from + the original flow). After the copy, the method looks up the + corresponding *copied* resources (via their ``copied_from_id`` field) + and updates each one's ``data_credentials_id`` to the value specified + in the mapping. + + Resources whose original IDs are **not** present in the mapping are + left untouched and keep whatever credential they were copied with. + + Args: + flow_id: The ID of the flow to copy. + resource_credential_mapping: A dict mapping original resource IDs + to the new credential IDs that should be set on the copied + versions of those resources. + ``{original_resource_id: new_credential_id}`` + copy_options: Optional additional copy options. The + ``reuse_data_credentials`` flag will always be forced to + ``True`` regardless of the value passed here. + target_project_id: Optional project ID to move the copied flow + into. When set, the copied flow is added to the specified + project after credential replacement. + + Returns: + The copied ``FlowResponse`` with credentials updated. + + Examples: + Copy a flow and replace the credential on the source and one sink:: + + copied = client.flows.copy_and_replace_credentials( + flow_id=100, + resource_credential_mapping={ + 500: 20, # source 500 -> cred 20 + 600: 30, # sink 600 -> cred 30 + }, + ) + """ + # Build copy options, always forcing reuse_data_credentials=True + if copy_options is None: + copy_options = FlowCopyOptions(reuse_data_credentials=True) + else: + copy_options = copy_options.model_copy( + update={"reuse_data_credentials": True} + ) + + # Step 1: Copy the flow + copied_flow = self.copy(flow_id, copy_options) + + # Step 2: Update credentials on copied resources that match the mapping + if copied_flow.data_sources: + for source in copied_flow.data_sources: + if ( + source.copied_from_id is not None + and source.copied_from_id in resource_credential_mapping + ): + new_cred_id = resource_credential_mapping[source.copied_from_id] + self.client.sources.update( + source.id, + SourceUpdate(data_credentials_id=new_cred_id), + ) + + if copied_flow.data_sinks: + for sink in copied_flow.data_sinks: + if ( + sink.copied_from_id is not None + and sink.copied_from_id in resource_credential_mapping + ): + new_cred_id = resource_credential_mapping[sink.copied_from_id] + self.client.destinations.update( + sink.id, + DestinationUpdate(data_credentials_id=new_cred_id), + ) + + # Step 3: Optionally move the copied flow into a target project + origin_node_id = copied_flow.flows[0].origin_node_id + if target_project_id is not None: + self.client.projects.add_flows( + target_project_id, + ProjectFlowList(flows=[origin_node_id]), + ) + + # Step 4: Re-fetch the flow to return an up-to-date response + return self.get(origin_node_id) + def delete(self, flow_id: int) -> Dict[str, Any]: """ Delete flow. diff --git a/tests/unit/test_flows.py b/tests/unit/test_flows.py index dc5f9c8..397d616 100644 --- a/tests/unit/test_flows.py +++ b/tests/unit/test_flows.py @@ -681,3 +681,407 @@ def test_get_metrics_all_parameters(self, mock_client, mock_http_client): assert last_request["params"]["orderby"] == "created_at" assert last_request["params"]["page"] == 2 assert last_request["params"]["per_page"] == 100 + + def test_copy_and_replace_credentials_source_and_sink( + self, mock_client, mock_http_client, mock_factory + ): + """Test copy_and_replace_credentials updates source and sink credentials.""" + # Arrange + original_source_id = 500 + original_sink_id = 600 + copied_source_id = 501 + copied_sink_id = 601 + new_source_cred = 20 + new_sink_cred = 30 + origin_node_id = 9999 + + # Build a copy response with copied_from_id set + copy_response = { + "flows": [ + mock_factory.create_mock_flow_node( + origin_node_id=origin_node_id, + ) + ], + "data_sources": [ + mock_factory.create_mock_source( + id=copied_source_id, + copied_from_id=original_source_id, + data_credentials_id=10, + ) + ], + "data_sinks": [ + mock_factory.create_mock_destination( + id=copied_sink_id, + copied_from_id=original_sink_id, + data_credentials_id=10, + ) + ], + } + + # Build a re-fetch response (after credential updates) + refetch_response = { + "flows": [ + mock_factory.create_mock_flow_node( + origin_node_id=origin_node_id, + ) + ], + "data_sources": [ + mock_factory.create_mock_source( + id=copied_source_id, + copied_from_id=original_source_id, + data_credentials_id=new_source_cred, + ) + ], + "data_sinks": [ + mock_factory.create_mock_destination( + id=copied_sink_id, + copied_from_id=original_sink_id, + data_credentials_id=new_sink_cred, + ) + ], + } + + # Updated source response + updated_source = mock_factory.create_mock_source( + id=copied_source_id, + data_credentials_id=new_source_cred, + ) + + # Updated sink response + updated_sink = mock_factory.create_mock_destination( + id=copied_sink_id, + data_credentials_id=new_sink_cred, + ) + + mock_http_client.add_response("/flows/100/copy", copy_response) + mock_http_client.add_response( + f"/data_sources/{copied_source_id}", updated_source + ) + mock_http_client.add_response(f"/data_sinks/{copied_sink_id}", updated_sink) + mock_http_client.add_response( + f"/flows/{origin_node_id}", refetch_response + ) + + # Act + result = mock_client.flows.copy_and_replace_credentials( + flow_id=100, + resource_credential_mapping={ + original_source_id: new_source_cred, + original_sink_id: new_sink_cred, + }, + ) + + # Assert + assert isinstance(result, FlowResponse) + + # Verify copy request used reuse_data_credentials=True + copy_requests = mock_http_client.get_requests_by_url_pattern("/flows/100/copy") + assert len(copy_requests) == 1 + assert copy_requests[0]["json"]["reuse_data_credentials"] is True + + # Verify source update request + source_updates = mock_http_client.get_requests_by_url_pattern( + f"/data_sources/{copied_source_id}" + ) + put_source = [r for r in source_updates if r["method"] == "PUT"] + assert len(put_source) == 1 + assert put_source[0]["json"]["data_credentials_id"] == new_source_cred + + # Verify sink update request + sink_updates = mock_http_client.get_requests_by_url_pattern( + f"/data_sinks/{copied_sink_id}" + ) + put_sink = [r for r in sink_updates if r["method"] == "PUT"] + assert len(put_sink) == 1 + assert put_sink[0]["json"]["data_credentials_id"] == new_sink_cred + + def test_copy_and_replace_credentials_partial_mapping( + self, mock_client, mock_http_client, mock_factory + ): + """Test that resources not in the mapping are left untouched.""" + # Arrange + original_source_id = 500 + original_sink_a_id = 600 + original_sink_b_id = 700 + copied_source_id = 501 + copied_sink_a_id = 601 + copied_sink_b_id = 701 + new_cred = 20 + origin_node_id = 9999 + + copy_response = { + "flows": [ + mock_factory.create_mock_flow_node(origin_node_id=origin_node_id) + ], + "data_sources": [ + mock_factory.create_mock_source( + id=copied_source_id, + copied_from_id=original_source_id, + data_credentials_id=10, + ) + ], + "data_sinks": [ + mock_factory.create_mock_destination( + id=copied_sink_a_id, + copied_from_id=original_sink_a_id, + data_credentials_id=10, + ), + mock_factory.create_mock_destination( + id=copied_sink_b_id, + copied_from_id=original_sink_b_id, + data_credentials_id=50, + ), + ], + } + + refetch_response = { + "flows": [ + mock_factory.create_mock_flow_node(origin_node_id=origin_node_id) + ], + "data_sources": [ + mock_factory.create_mock_source( + id=copied_source_id, + copied_from_id=original_source_id, + data_credentials_id=new_cred, + ) + ], + "data_sinks": [ + mock_factory.create_mock_destination( + id=copied_sink_a_id, + copied_from_id=original_sink_a_id, + data_credentials_id=new_cred, + ), + mock_factory.create_mock_destination( + id=copied_sink_b_id, + copied_from_id=original_sink_b_id, + data_credentials_id=50, + ), + ], + } + + updated_source = mock_factory.create_mock_source( + id=copied_source_id, data_credentials_id=new_cred + ) + updated_sink_a = mock_factory.create_mock_destination( + id=copied_sink_a_id, data_credentials_id=new_cred + ) + + mock_http_client.add_response("/flows/100/copy", copy_response) + mock_http_client.add_response( + f"/data_sources/{copied_source_id}", updated_source + ) + mock_http_client.add_response( + f"/data_sinks/{copied_sink_a_id}", updated_sink_a + ) + mock_http_client.add_response( + f"/flows/{origin_node_id}", refetch_response + ) + + # Act — only map source and sink A, leave sink B untouched + result = mock_client.flows.copy_and_replace_credentials( + flow_id=100, + resource_credential_mapping={ + original_source_id: new_cred, + original_sink_a_id: new_cred, + # original_sink_b_id intentionally NOT mapped + }, + ) + + # Assert + assert isinstance(result, FlowResponse) + + # Sink B should NOT have been updated (no PUT to its endpoint) + sink_b_requests = mock_http_client.get_requests_by_url_pattern( + f"/data_sinks/{copied_sink_b_id}" + ) + put_sink_b = [r for r in sink_b_requests if r["method"] == "PUT"] + assert len(put_sink_b) == 0 + + def test_copy_and_replace_credentials_preserves_copy_options( + self, mock_client, mock_http_client, mock_factory + ): + """Test that extra copy options are preserved and reuse_data_credentials is forced True.""" + # Arrange + origin_node_id = 9999 + copy_response = { + "flows": [ + mock_factory.create_mock_flow_node(origin_node_id=origin_node_id) + ], + "data_sources": [], + "data_sinks": [], + } + refetch_response = copy_response.copy() + + mock_http_client.add_response("/flows/100/copy", copy_response) + mock_http_client.add_response( + f"/flows/{origin_node_id}", refetch_response + ) + + # Act — pass copy_options with reuse_data_credentials=False (should be overridden) + result = mock_client.flows.copy_and_replace_credentials( + flow_id=100, + resource_credential_mapping={}, + copy_options=FlowCopyOptions( + reuse_data_credentials=False, + copy_access_controls=True, + owner_id=42, + ), + ) + + # Assert + assert isinstance(result, FlowResponse) + + copy_requests = mock_http_client.get_requests_by_url_pattern("/flows/100/copy") + assert len(copy_requests) == 1 + # reuse_data_credentials should be forced True + assert copy_requests[0]["json"]["reuse_data_credentials"] is True + # Other options should be preserved + assert copy_requests[0]["json"]["copy_access_controls"] is True + assert copy_requests[0]["json"]["owner_id"] == 42 + + def test_copy_and_replace_credentials_empty_mapping( + self, mock_client, mock_http_client, mock_factory + ): + """Test with empty mapping — just copies the flow with no credential changes.""" + # Arrange + origin_node_id = 9999 + copy_response = { + "flows": [ + mock_factory.create_mock_flow_node(origin_node_id=origin_node_id) + ], + "data_sources": [ + mock_factory.create_mock_source( + id=501, copied_from_id=500, data_credentials_id=10 + ) + ], + "data_sinks": [ + mock_factory.create_mock_destination( + id=601, copied_from_id=600, data_credentials_id=10 + ) + ], + } + refetch_response = copy_response.copy() + + mock_http_client.add_response("/flows/100/copy", copy_response) + mock_http_client.add_response( + f"/flows/{origin_node_id}", refetch_response + ) + + # Act + result = mock_client.flows.copy_and_replace_credentials( + flow_id=100, + resource_credential_mapping={}, + ) + + # Assert — no PUT requests should have been made + assert isinstance(result, FlowResponse) + put_requests = mock_http_client.get_requests_by_method("PUT") + assert len(put_requests) == 0 + + def test_copy_and_replace_credentials_with_target_project( + self, mock_client, mock_http_client, mock_factory + ): + """Test that target_project_id moves the copied flow into the project.""" + # Arrange + original_source_id = 500 + copied_source_id = 501 + new_cred = 20 + origin_node_id = 9999 + target_project_id = 42 + + copy_response = { + "flows": [ + mock_factory.create_mock_flow_node(origin_node_id=origin_node_id) + ], + "data_sources": [ + mock_factory.create_mock_source( + id=copied_source_id, + copied_from_id=original_source_id, + data_credentials_id=10, + ) + ], + "data_sinks": [], + } + + updated_source = mock_factory.create_mock_source( + id=copied_source_id, data_credentials_id=new_cred + ) + + # add_flows response (list of project data flows) + add_flows_response = [ + {"id": 1, "project_id": target_project_id, "flow_node_id": origin_node_id} + ] + + refetch_response = { + "flows": [ + mock_factory.create_mock_flow_node(origin_node_id=origin_node_id) + ], + "data_sources": [ + mock_factory.create_mock_source( + id=copied_source_id, + copied_from_id=original_source_id, + data_credentials_id=new_cred, + ) + ], + "data_sinks": [], + } + + mock_http_client.add_response("/flows/100/copy", copy_response) + mock_http_client.add_response( + f"/data_sources/{copied_source_id}", updated_source + ) + mock_http_client.add_response( + f"/projects/{target_project_id}/flows", add_flows_response + ) + mock_http_client.add_response( + f"/flows/{origin_node_id}", refetch_response + ) + + # Act + result = mock_client.flows.copy_and_replace_credentials( + flow_id=100, + resource_credential_mapping={original_source_id: new_cred}, + target_project_id=target_project_id, + ) + + # Assert + assert isinstance(result, FlowResponse) + + # Verify add_flows was called with the correct project and flow + project_requests = mock_http_client.get_requests_by_url_pattern( + f"/projects/{target_project_id}/flows" + ) + put_project = [r for r in project_requests if r["method"] == "PUT"] + assert len(put_project) == 1 + assert put_project[0]["json"]["flows"] == [origin_node_id] + + def test_copy_and_replace_credentials_no_target_project( + self, mock_client, mock_http_client, mock_factory + ): + """Test that no project request is made when target_project_id is None.""" + # Arrange + origin_node_id = 9999 + copy_response = { + "flows": [ + mock_factory.create_mock_flow_node(origin_node_id=origin_node_id) + ], + "data_sources": [], + "data_sinks": [], + } + refetch_response = copy_response.copy() + + mock_http_client.add_response("/flows/100/copy", copy_response) + mock_http_client.add_response( + f"/flows/{origin_node_id}", refetch_response + ) + + # Act — no target_project_id + result = mock_client.flows.copy_and_replace_credentials( + flow_id=100, + resource_credential_mapping={}, + ) + + # Assert — no project requests should have been made + assert isinstance(result, FlowResponse) + project_requests = mock_http_client.get_requests_by_url_pattern("/projects/") + assert len(project_requests) == 0