Skip to content

Conversation

@pankajastro
Copy link
Member

@pankajastro pankajastro commented Mar 14, 2024

Look like the XComObjectStoreBackend config
variables are incorrect in docs or updated but
the docs still have old names. The implementation
code *store* but docs has *storage*

def serialize_value(
value: T,
*,
key: str | None = None,
task_id: str | None = None,
dag_id: str | None = None,
run_id: str | None = None,
map_index: int | None = None,
) -> bytes | str:
# we will always serialize ourselves and not by BaseXCom as the deserialize method
# from BaseXCom accepts only XCom objects and not the value directly
s_val = json.dumps(value, cls=XComEncoder).encode("utf-8")
path = conf.get(SECTION, "xcom_objectstore_path", fallback="")
compression = conf.get(SECTION, "xcom_objectstore_compression", fallback=None)
if compression:
suffix = "." + _get_compression_suffix(compression)
else:
suffix = ""
threshold = conf.getint(SECTION, "xcom_objectstore_threshold", fallback=-1)
if path and -1 < threshold < len(s_val):
# safeguard against collisions
while True:
p = ObjectStoragePath(path) / f"{dag_id}/{run_id}/{task_id}/{str(uuid.uuid4())}{suffix}"
if not p.exists():
break
if not p.parent.exists():
p.parent.mkdir(parents=True, exist_ok=True)
with p.open(mode="wb", compression=compression) as f:
f.write(s_val)
return BaseXCom.serialize_value(str(p))
else:
return s_val

Also, tests are when xcom_objectstore_compression an empty string so I have set it to None if it empty string


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@pankajastro pankajastro merged commit 0df0e09 into apache:main Mar 14, 2024
@pankajastro pankajastro deleted the fix_object_storage_docs branch March 14, 2024 15:50
@ephraimbuddy ephraimbuddy added the changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) label Mar 14, 2024
@pankajastro pankajastro changed the title Fix XComObjectStoreBackend config var in docs Fix XComObjectStoreBackend config var in docs and bug when xcom_objectstore_compression is empty Mar 15, 2024
@bolkedebruin
Copy link
Contributor

bolkedebruin commented Mar 19, 2024

Maybe it would have been better to move the implementation to xcom_objectstorage_path instead of adopting a new name? So making it consistent instead of deviating. Storage is used elsewhere and this very subtle naming difference might not be caught?

@pankajastro
Copy link
Member Author

Indeed, I noticed that throughout the codebase, we've used the name objectstorage in various places. During testing, I discovered that the expected config variable name should be objectstore instead of objectstorage, so I made the necessary changes. However, for consistency, I propose reverting back to objectstore since we've not released it yet.

@pankajastro
Copy link
Member Author

@bolkedebruin create a PR #38415 PTAL

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:providers changelog:skip Changes that should be skipped from the changelog (CI, tests, etc..) kind:documentation provider:common-io

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants