-
Notifications
You must be signed in to change notification settings - Fork 357
Description
SQLMesh Version: 0.174.2 (but I believe the problem still persists)
Dialect: Trino
State: Postgres
I created a custom materialization to override the default use of MERGE for INCREMENTAL_BY_UNIQUE_KEY in Trino for performance reasons.
I did want to preserve the sequential behaviour though so created a custom model kind for this.
The custom classes live in the same project as the models. I've had the error in both unit testing and when running in Airflow. When running the process using sqlmesh run from local the error is not present.
../.venv/lib/python3.12/site-packages/sqlmesh/core/context.py:444: in __init__
self.load()
../.venv/lib/python3.12/site-packages/sqlmesh/core/context.py:589: in load
loaded_projects = [loader.load() for loader in self._loaders]
../.venv/lib/python3.12/site-packages/sqlmesh/core/loader.py:126: in load
models = self._load_models(
../.venv/lib/python3.12/site-packages/sqlmesh/core/loader.py:428: in _load_models
sql_models = self._load_sql_models(macros, jinja_macros, audits, signals, cache)
../.venv/lib/python3.12/site-packages/sqlmesh/core/loader.py:489: in _load_sql_models
for model in cache.get_or_load_models(path, _load):
../.venv/lib/python3.12/site-packages/sqlmesh/core/loader.py:760: in get_or_load_models
models = self._model_cache.get_or_load(
../.venv/lib/python3.12/site-packages/sqlmesh/core/model/cache.py:70: in get_or_load
self._file_cache.put(name, entry_id, value=models)
def put(self, name: str, entry_id: str = "", *, value: T) -> None:
"""Stores the given value in the cache.
Args:
name: The name of the entry.
entry_id: The unique entry identifier. Used for cache invalidation.
value: The value to store in the cache.
"""
self._path.mkdir(parents=True, exist_ok=True)
if not self._path.is_dir():
raise SQLMeshError(f"Cache path '{self._path}' is not a directory.")
with gzip.open(self._cache_entry_path(name, entry_id), "wb", compresslevel=1) as fd:
> pickle.dump(value, fd)
E _pickle.PicklingError: Can't pickle <class 'materializations.delsert.DelSertKind'>: it's not the same object as materializations.delsert.DelSertKind
Apparently pickle enforces object identity and because the id of the class in the cache is different to that of the instantiated object, it's causing this error.
If I remove the custom model kind, the error disappears. I have also tried setting MAX_FORK_WORKERS=1 but this didn't help.
There is a workaround I can employ already, which is to import the custom materialization cache and clear it each time. I've only got one use of it so far, so this shouldn't be a big deal.
def run_model(model_name: str) -> None:
"""Will run a specific model."""
from sqlmesh.core.snapshot import evaluator
evaluator._custom_materialization_type_cache.clear() # Temporary workaround
sqlmesh_context = Context(paths=SQLMESH_PROJECT_PATH)
completion_status: CompletionStatus = sqlmesh_context.run(
skip_janitor=True,
select_models=[model_name],
)
if completion_status.is_failure:
raise AirflowException