Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions rust/crates/sift_cli/src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ pub struct ExportArgs {
#[arg(long)]
pub channel_id: Vec<String>,

/// Regular expression used to filter calculated channels to include in the export
#[arg(long)]
pub calculated_channel_regex: Option<String>,

/// Name of calculated channel to include in the export; can be specified multiple times
#[arg(long)]
pub calculated_channel: Vec<String>,

/// ID of calculated channel to include in the export; can be specified multiple times
#[arg(long)]
pub calculated_channel_id: Vec<String>,

/// Start time in RFC 3339 format (required for asset exports)
#[arg(long)]
pub start: Option<String>,
Expand Down
113 changes: 48 additions & 65 deletions rust/crates/sift_cli/src/cmd/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ use zip::ZipArchive;
use crate::{
cli::{ExportAssetArgs, ExportRunArgs},
util::{
api::create_grpc_channel, channel::filter_channels, job::JobServiceWrapper,
progress::Spinner, tty::Output,
api::create_grpc_channel,
calculated_channel::{ResolveScope, resolve_calculated_channels},
channel::resolve_channel_ids,
job::JobServiceWrapper,
progress::Spinner,
tty::Output,
},
};

Expand Down Expand Up @@ -85,40 +89,25 @@ pub async fn run(ctx: Context, args: ExportRunArgs) -> Result<ExitCode> {
run.name.clone().yellow()
));
}
let asset_ids_cel = run
.asset_ids
.iter()
.map(|a| format!("'{a}'"))
.collect::<Vec<String>>()
.join(",");

let mut channel_ids = args.common.channel_id;

if !args.common.channel.is_empty() {
let channel_names_cel = args
.common
.channel
.iter()
.map(|c| format!("'{c}'"))
.collect::<Vec<String>>()
.join(",");

let filter = format!("asset_id in [{asset_ids_cel}] && name in [{channel_names_cel}]");
let query_res = filter_channels(grpc_channel.clone(), &filter).await?;

for channel in query_res {
channel_ids.push(channel.channel_id);
}
}

if let Some(re) = args.common.channel_regex {
let filter = format!("asset_id in [{asset_ids_cel}] && name.matches(\"{re}\")");
let query_res = filter_channels(grpc_channel.clone(), &filter).await?;

for channel in query_res {
channel_ids.push(channel.channel_id);
}
}
let channel_ids = resolve_channel_ids(
grpc_channel.clone(),
&args.common.channel,
args.common.channel_regex.as_deref(),
args.common.channel_id,
&run.asset_ids,
)
.await?;

let scope = ResolveScope::Run(&run.run_id);
let calculated_channel_configs = resolve_calculated_channels(
grpc_channel.clone(),
&args.common.calculated_channel,
args.common.calculated_channel_regex.as_deref(),
&args.common.calculated_channel_id,
&run.asset_ids,
&scope,
)
.await?;

let start_time = args
.common
Expand All @@ -144,6 +133,7 @@ pub async fn run(ctx: Context, args: ExportRunArgs) -> Result<ExitCode> {

let export_req = ExportDataRequest {
channel_ids,
calculated_channel_configs,
output_format: ExportOutputFormat::from(args.common.format).into(),
time_selection: Some(TimeSelection::RunsAndTimeRange(RunsAndTimeRange {
start_time,
Expand Down Expand Up @@ -194,44 +184,37 @@ pub async fn asset(ctx: Context, args: ExportAssetArgs) -> Result<ExitCode> {
.into_inner();

if assets.is_empty() {
return Err(anyhow!("no run found"));
return Err(anyhow!("no asset found"));
}
let asset = assets.first().unwrap();
let asset_id = &asset.asset_id;

let mut channel_ids = args.common.channel_id;

if !args.common.channel.is_empty() {
let channel_names_cel = args
.common
.channel
.iter()
.map(|c| format!("'{c}'"))
.collect::<Vec<String>>()
.join(",");

let filter = format!("asset_id == '{asset_id}' && name in [{channel_names_cel}]");
let query_res = filter_channels(grpc_channel.clone(), &filter).await?;

for channel in query_res {
channel_ids.push(channel.channel_id);
}
}

if let Some(re) = args.common.channel_regex {
let filter = format!("asset_id == '{asset_id}' && name.matches(\"{re}\")");
let query_res = filter_channels(grpc_channel.clone(), &filter).await?;

for channel in query_res {
channel_ids.push(channel.channel_id);
}
}
let asset_ids = vec![asset_id.to_string()];
let channel_ids = resolve_channel_ids(
grpc_channel.clone(),
&args.common.channel,
args.common.channel_regex.as_deref(),
args.common.channel_id,
&asset_ids,
)
.await?;
let scope = ResolveScope::Assets(&asset_ids);
let calculated_channel_configs = resolve_calculated_channels(
grpc_channel.clone(),
&args.common.calculated_channel,
args.common.calculated_channel_regex.as_deref(),
&args.common.calculated_channel_id,
&asset_ids,
&scope,
)
.await?;

let export_req = ExportDataRequest {
channel_ids,
Comment thread
nathan-sift marked this conversation as resolved.
calculated_channel_configs,
output_format: ExportOutputFormat::from(args.common.format).into(),
time_selection: Some(TimeSelection::AssetsAndTimeRange(AssetsAndTimeRange {
asset_ids: vec![asset_id.to_string()],
asset_ids,
start_time: Some(start_time),
stop_time: Some(stop_time),
})),
Expand Down
Loading
Loading