Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions nexla_sdk/resources/flows.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from typing import Any, Dict, List, Optional, Union

from nexla_sdk.models.destinations.requests import DestinationUpdate
from nexla_sdk.models.flows.requests import FlowCopyOptions
from nexla_sdk.models.projects.requests import ProjectFlowList
from nexla_sdk.models.flows.responses import (
DocsRecommendation,
FlowLogsResponse,
FlowMetricsApiResponse,
FlowResponse,
)
from nexla_sdk.models.sources.requests import SourceUpdate
from nexla_sdk.resources.base_resource import BaseResource


Expand Down Expand Up @@ -168,6 +171,102 @@ def copy(
"""
return super().copy(flow_id, options)

def copy_and_replace_credentials(
self,
flow_id: int,
resource_credential_mapping: Dict[int, int],
copy_options: Optional[FlowCopyOptions] = None,
target_project_id: Optional[int] = None,
) -> FlowResponse:
"""Copy a flow and replace credentials on specified resources.

This convenience method copies a flow with ``reuse_data_credentials=True``
so that all copied resources initially keep their original credentials,
then updates the credentials for the resources listed in the mapping.

The mapping keys are **original** resource IDs (source or sink IDs from
the original flow). After the copy, the method looks up the
corresponding *copied* resources (via their ``copied_from_id`` field)
and updates each one's ``data_credentials_id`` to the value specified
in the mapping.

Resources whose original IDs are **not** present in the mapping are
left untouched and keep whatever credential they were copied with.

Args:
flow_id: The ID of the flow to copy.
resource_credential_mapping: A dict mapping original resource IDs
to the new credential IDs that should be set on the copied
versions of those resources.
``{original_resource_id: new_credential_id}``
copy_options: Optional additional copy options. The
``reuse_data_credentials`` flag will always be forced to
``True`` regardless of the value passed here.
target_project_id: Optional project ID to move the copied flow
into. When set, the copied flow is added to the specified
project after credential replacement.

Returns:
The copied ``FlowResponse`` with credentials updated.

Examples:
Copy a flow and replace the credential on the source and one sink::

copied = client.flows.copy_and_replace_credentials(
flow_id=100,
resource_credential_mapping={
500: 20, # source 500 -> cred 20
600: 30, # sink 600 -> cred 30
},
)
"""
# Build copy options, always forcing reuse_data_credentials=True
if copy_options is None:
copy_options = FlowCopyOptions(reuse_data_credentials=True)
else:
copy_options = copy_options.model_copy(
update={"reuse_data_credentials": True}
)

# Step 1: Copy the flow
copied_flow = self.copy(flow_id, copy_options)

# Step 2: Update credentials on copied resources that match the mapping
if copied_flow.data_sources:
for source in copied_flow.data_sources:
if (
source.copied_from_id is not None
and source.copied_from_id in resource_credential_mapping
):
new_cred_id = resource_credential_mapping[source.copied_from_id]
self.client.sources.update(
source.id,
SourceUpdate(data_credentials_id=new_cred_id),
)

if copied_flow.data_sinks:
for sink in copied_flow.data_sinks:
if (
sink.copied_from_id is not None
and sink.copied_from_id in resource_credential_mapping
):
new_cred_id = resource_credential_mapping[sink.copied_from_id]
self.client.destinations.update(
sink.id,
DestinationUpdate(data_credentials_id=new_cred_id),
)

# Step 3: Optionally move the copied flow into a target project
origin_node_id = copied_flow.flows[0].origin_node_id
if target_project_id is not None:
self.client.projects.add_flows(
target_project_id,
ProjectFlowList(flows=[origin_node_id]),
)

# Step 4: Re-fetch the flow to return an up-to-date response
return self.get(origin_node_id)

def delete(self, flow_id: int) -> Dict[str, Any]:
"""
Delete flow.
Expand Down
Loading
Loading