Conversation
…e spatial index, lazy data loading
- Pose stored as 7 real columns (x/y/z + quaternion) instead of blob, enabling R*Tree spatial indexing
- Payload moved to separate {name}_payload table with lazy loading via _data_loader closure
- R*Tree virtual table created per stream for .near() bounding-box queries
- Added __iter__ to Stream for lazy iteration via fetch_pages
- Added embedding_stream() to Session ABC
- Updated _streams metadata with parent_stream and embedding_dim columns
- Codec module extracted (LcmCodec, PickleCodec, codec_for_type)
- Fixed broken memory_old.timeseries imports (memory.timeseries → memory_old.timeseries)
- Tests now use real Image data from TimedSensorReplay("unitree_go2_bigoffice/video")
- 32/32 tests passing, mypy clean
…dowTransformer, E2E test - Add JpegCodec as default codec for Image types (2.76MB → 64KB per frame) - Preserve frame_id in JPEG header; ts stored in meta table - Add ingest() helper for bulk-loading (ts, payload) iterables into streams - Add QualityWindowTransformer: best-frame-per-window (supports backfill + live) - EmbeddingTransformer sets output_type=Embedding automatically - Require payload_type when creating new streams (no silent PickleCodec fallback) - TransformStream.store() accepts payload_type, propagated through materialize_transform - E2E test: 5min video → sharpness filter → CLIP embed → text search - Move test_sqlite.py next to sqlite.py, update Image comparisons for lossy codec - Add sqlite-vec dependency
…rojection - Add parent_id to Observation, append(), do_append(), and _META_COLS - All transformers (PerItem, QualityWindow, Embedding) pass obs.id as parent_id - SqliteEmbeddingBackend._row_to_obs() wires _source_data_loader via parent_id - EmbeddingObservation.data now auto-projects to parent stream's payload (e.g. Image) - No more timestamp-matching hacks to find source data from embedding results
- materialize_transform() now UPDATEs _streams.parent_stream so stream-level lineage is discoverable (prerequisite for .join()) - Fix mypy: narrow parent_table type in _source_loader closure - Add plans/memory/tasks.md documenting all spec-vs-impl gaps
Adds LineageFilter that compiles to nested SQL subqueries walking the parent_id chain. project_to(target) returns a chainable target Stream using the same _with_filter mechanism as .after(), .near(), etc. Also fixes _session propagation in search_embedding/search_text.
EmbeddingStream is a semantic index — search results should be source observations (Images), not Embedding objects. search_embedding now auto-projects via project_to when lineage exists, falling back to EmbeddingStream for standalone streams without parent lineage.
- Add CaptionTransformer: wraps Captioner/VlModel, uses caption_batch() for backfill efficiency, auto-creates TextStream with FTS on .store() - Fix Florence2 caption_batch() emitting <pad> tokens (skip_special_tokens) - E2E script now uses transform pipeline for captioning search results
fetch() now returns ObservationSet instead of plain list, keeping you in the Stream API. This enables fork-and-zip (one DB query, two uses) and in-memory re-filtering without re-querying the database. - Add matches(obs) to all filter dataclasses for in-Python evaluation - Add ListBackend (in-memory StreamBackend) and ObservationSet class - Filtered .appended reactive subscription via matches() infrastructure - Update e2e export script to use fork-and-zip pattern - 20 new tests (64 total, all passing)
EmbeddingStream now holds an optional model reference, so search_embedding auto-dispatches: str → embed_text(), image → embed(), Embedding/list[float] → use directly. The model is wired through materialize_transform and also accepted via embedding_stream().
- Fix SpatialImage/SpatialEntry dataclass hierarchy in memory_old - Fix import path in memory_old/test_embedding.py - Add None guard for obs.ts in run_viz_demo.py - Add payload_type/session kwargs to base Stream.store() signature - Type-annotate embeddings as EmbeddingStream in run_e2e_export.py - Add similarity scores, raw search mode, pose ingest, viz pipeline
- Normalize similarity scores relative to min/max (CLIP clusters in narrow band) - Add distance_transform_edt spread so dots radiate outward, fading to 0 - Bump default search k to 200 for denser heatmaps
- Validate stream names and tag keys as SQL identifiers
- Allowlist order_by fields to {id, ts}
- Re-sort vector search results by distance rank after IN-clause fetch
- Make TagsFilter hashable (tuple of pairs instead of dict)
- Remove dead code in memory_old/embedding.py
- Add scipy-stubs, fix distance_transform_edt type annotations
- Add dimos/memory/rerun.py: to_rerun() sends stream data to Rerun with auto-derived entity paths and no wall-clock timeline contamination - Fix Stream.fetch_pages() to respect limit_val (was always overridden by batch_size, making .limit() ineffective during iteration) - Update viz.py: normalize similarities with 20% floor cutoff, sort timeline by timestamp, add log_top_images() - Convert run_e2e_export.py to pytest with cached DB fixture - Update plans/memory docs to match current implementation
…, fix mypy - Rename to test_e2e_export.py (it's a pytest file, not a standalone script) - Fix Generator return type and type: ignore for mypy - Delete viz.py (replaced by rerun.py) and run_viz_demo.py - Update docs/api.md to reference rerun.py instead of viz.py
…ad reduction - Switch JpegCodec from cv2.imencode to TurboJPEG (2-5x faster encode/decode) - Lower default JPEG quality from 90 to 50 for smaller storage footprint - Downscale sharpness computation to 160px Laplacian variance (10-20x cheaper) - Add MemoryModule with plain-Python sharpness windowing (no rx timer overhead) - Limit OpenCV threads: 2 globally in worker entrypoint, 1 in MemoryModule - Cap global rx ThreadPoolScheduler at 8 workers (was unbounded cpu_count) - Refactor SqliteEmbeddingBackend/SqliteTextBackend to use _post_insert hook - Encode payload before meta insert to prevent orphaned rows on codec error - Add `dimos ps` CLI command and `dps` entrypoint for non-interactive process listing - Add unitree-go2-memory blueprint
All store-related code now lives under store/: base class in base.py, MemoryStore in memory.py, SqliteStore in sqlite.py. store/__init__.py re-exports public API. Also renamed test_impl.py → test_store.py.
Subdirectory __init__.py files in memory2/ were re-exporting symbols from their submodules. Replace all imports with direct module paths (e.g. utils.sqlite.open_sqlite_connection instead of utils) and empty out the __init__.py files.
Replace _migrate_or_create with CREATE TABLE IF NOT EXISTS.
Replace start()/try/finally/stop() with `with` statements.
No code imports from package-level; all use direct module paths. Python 3.3+ implicit namespace packages make these unnecessary.
Migrate BlobStore, VectorStore, ObservationStore, Notifier, and RegistryStore to use the Configurable[ConfigT] mixin pattern, matching the existing Store class. Runtime deps (conn, codec) use Field(exclude=True) so serialize()/model_dump() skips them. All call sites updated to keyword args.
Centralizes the pattern of opening a SQLite connection paired with a disposable that closes it, replacing manual Disposable(lambda: conn.close()) at each call site.
# Conflicts: # dimos/__init__.py
The test_detection3dpc test fails intermittently in full suite runs due to non-deterministic point cloud boundary values.
| except (ImportError, RuntimeError): | ||
| return None |
There was a problem hiding this comment.
This seems like a silent test failure. Why would we fail to import?
| pass | ||
|
|
||
|
|
||
| class SubjectNotifier(Notifier[T], Generic[T]): |
There was a problem hiding this comment.
I believe this is redundant:
| class SubjectNotifier(Notifier[T], Generic[T]): | |
| class SubjectNotifier(Notifier[T]): |
| ``stream`` is the raw stream name (for R*Tree table references). | ||
| ``prefix`` is a column qualifier (e.g. ``"meta."`` for JOIN queries). | ||
| """ | ||
| if isinstance(f, AfterFilter): |
There was a problem hiding this comment.
Do we need to handle subclasses here? If not, then it might make sense to disallow them with @final and turn this into a mapping for better efficiency?
{AfterFilter: lambda f, p: f"...", ...}[type(f)](f, filter)
| r = f.radius | ||
| # R*Tree bounding-box pre-filter + exact squared-distance check | ||
| rtree_sql = ( | ||
| f'{prefix}id IN (SELECT id FROM "{stream}_rtree" ' |
There was a problem hiding this comment.
Also, shouldn't we use sqlalchemy here? These strings are all highly vulnerable to SQL injection if there's any possibility of an attacker controlling the inputs.
| f'CREATE TABLE IF NOT EXISTS "{self._name}" (' | ||
| " id INTEGER PRIMARY KEY AUTOINCREMENT," | ||
| " ts REAL NOT NULL UNIQUE," | ||
| " pose_x REAL, pose_y REAL, pose_z REAL," | ||
| " pose_qx REAL, pose_qy REAL, pose_qz REAL, pose_qw REAL," | ||
| " tags BLOB DEFAULT (jsonb('{}'))" |
There was a problem hiding this comment.
What if we need to change the table layout in future? Do we just throw away the data, or do we have DB migration in place? Again, if we use sqlalchemy, we can get this from alembic easily.
| return len(self._buf) | ||
|
|
||
|
|
||
| class Unbounded(BackpressureBuffer[T]): |
There was a problem hiding this comment.
Don't all these classes already exist in stdlib?
Certainly in asyncio, we have several of these type of queues:
https://docs.python.org/3/library/asyncio-queue.html#asyncio.Queue
|
|
||
|
|
||
| @pytest.fixture | ||
| def memory_store() -> Generator[MemoryStore, None, None]: |
There was a problem hiding this comment.
It's very rare to need Generator, all these can be changed:
| def memory_store() -> Generator[MemoryStore, None, None]: | |
| def memory_store() -> Iterator[MemoryStore]: |
| store = FileBlobStore(root=str(tmp_path / "blobs")) | ||
| store.start() | ||
| yield store | ||
| store.stop() |
There was a problem hiding this comment.
Didn't these have context managers implemented?
| store = FileBlobStore(root=str(tmp_path / "blobs")) | |
| store.start() | |
| yield store | |
| store.stop() | |
| with FileBlobStore(root=str(tmp_path / "blobs")) as store: | |
| yield store |
| def _batched(it: Iterator[T], n: int) -> Iterator[list[T]]: | ||
| """Yield successive n-sized chunks from an iterator.""" | ||
| while True: | ||
| batch = list(islice(it, n)) | ||
| if not batch: | ||
| return | ||
| yield batch |
There was a problem hiding this comment.
| def _batched(it: Iterator[T], n: int) -> Iterator[list[T]]: | |
| """Yield successive n-sized chunks from an iterator.""" | |
| while True: | |
| batch = list(islice(it, n)) | |
| if not batch: | |
| return | |
| yield batch | |
| if sys.version_info >= (3, 12): | |
| from itertools import batched | |
| else: | |
| def batched(it: Iterator[T], n: int) -> Iterator[list[T]]: | |
| """Yield successive n-sized chunks from an iterator.""" | |
| while True: | |
| batch = list(islice(it, n)) | |
| if not batch: | |
| return | |
| yield batch |
|
|
||
| def __init__(self, **kwargs: Any) -> None: | ||
| super().__init__(**kwargs) | ||
| assert self.config.conn is not None, "conn is required" |
There was a problem hiding this comment.
Why allow None in the first place?
Summary
New memory subsystem (
memory2) — a lazy, pull-based stream engine for storing, querying, and transforming robot observations with full type safety.Key pieces:
Stream[T]/Observation[T]model with typed filters (TimeRange,NearFilter, spatial R*Tree index)ListBackendand fullSqliteBackend(JSONB metadata, vec0 vector search, R*Tree spatial index, cursor pagination)search_embedding(str | image)LiveChannelwith.observable()/.subscribe()Pluggable extension points:
Store(observation backend),LiveChannel(reactive pub/sub),BlobStore(binary data),EmbeddingStore(vector search) — each swappable independently.Documentation:
Breaking Changes
None —
memory2is a new parallel module; existingmemory/is untouched.How to Test
Contributor License Agreement