Skip to content

feat: _inputs/custom_feeds/ auto-detected drop-in ingestion (#139)#149

Merged
yohei1126 merged 1 commit intomainfrom
feat/custom-feeds-drop-in
Apr 9, 2026
Merged

feat: _inputs/custom_feeds/ auto-detected drop-in ingestion (#139)#149
yohei1126 merged 1 commit intomainfrom
feat/custom-feeds-drop-in

Conversation

@yohei1126
Copy link
Copy Markdown
Contributor

Closes #139

Summary

  • src/ingest/custom_feeds.py — auto-detects feed type from column signature (SAR / cargo / sanctions / AIS), falls back to filename prefix (sar_*, cargo_*, manifest_*, sanctions_*, ais_*), and routes each CSV to the correct DuckDB table. Optional .columnmap.json sidecar lets AIS feeds with non-standard columns map without code changes.
  • scripts/run_pipeline.py — new step_custom_feeds inserted as step 5 of 10 (after sanctions, before ownership graph); also runs on every cadence rescore loop so live drop-ins are picked up automatically.
  • scripts/run_operations_shell.sh — job 15: interactive prompt for feeds directory + dry-run option + optional feature matrix + scoring.
  • docs/pipeline-operations.md — new Custom feed drop-ins section with interface contract table (required/optional columns per feed type), .columnmap.json format, and standalone CLI usage.
  • tests/test_custom_feeds.py — 26 unit tests covering all four feed types, column-map sidecar, dry-run, unknown schema skip, deduplication, and edge cases.

Required columns per feed type

Feed type Required columns Target table
AIS positions mmsi/MMSI, lat/LAT, lon/LON, timestamp/BaseDateTime ais_positions
SAR detections lat, lon, detected_at sar_detections
Cargo manifest reporter, partner, hs_code, period trade_flow
Custom sanctions name, list_source sanctions_entities

Test plan

  • uv run pytest tests/test_custom_feeds.py — 26 passed
  • uv run ruff check src/ingest/custom_feeds.py tests/test_custom_feeds.py scripts/run_pipeline.py — clean
  • CI green

🤖 Generated with Claude Code

Drop any CSV into _inputs/custom_feeds/ and it is auto-routed to the
correct DuckDB table on the next pipeline run — no code changes required.

- src/ingest/custom_feeds.py: feed-type detection by column signature
  (sar/cargo/sanctions/ais) with filename-prefix fallback; per-type
  ingestors; optional .columnmap.json sidecar for AIS feeds
- scripts/run_pipeline.py: new step_custom_feeds (step 5 of 10) between
  sanctions loading and ownership graph; also included in cadence rescore loop
- scripts/run_operations_shell.sh: job 15 — interactive custom feed drop-in
- docs/pipeline-operations.md: custom feeds section with interface contract
  (required/optional columns per feed type), sidecar format, standalone usage
- tests/test_custom_feeds.py: 26 unit tests covering all 4 feed types,
  column-map sidecar, dry-run, unknown schema, deduplication, edge cases

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@yohei1126 yohei1126 merged commit 6ac0c0d into main Apr 9, 2026
5 checks passed
@yohei1126 yohei1126 deleted the feat/custom-feeds-drop-in branch April 9, 2026 15:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: _inputs/custom_feeds/ auto-detected drop-in ingestion for proprietary data

1 participant