Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ dmypy.json
# Pyre type checker
.pyre/

# sphinx
doctest

# raw data
*.parquet
*.csv
Expand Down
14 changes: 14 additions & 0 deletions CITATION.cff
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,17 @@ references:
- description: "Github Link with Contributors"
type: url
value: "https://github.com/WayScience/nf1_cellpainting_data/graphs/contributors"
- authors:
- name: "Pycytominer Team"
date-accessed: "2024-04-03"
title: Pycytominer DeepProfiler Test Data
type: data
repository-code: "https://github.com/cytomining/pycytominer"
url: "https://github.com/cytomining/pycytominer/tree/main/tests/test_data/DeepProfiler_example_data/"
notes: >-
DeepProfiler and CellProfiler generated data from Pycytominer project are used to help validate
expected results for CytoTable.
identifiers:
- description: "Github Link with Contributors"
type: url
value: "https://github.com/cytomining/pycytominer/graphs/contributors"
159 changes: 107 additions & 52 deletions cytotable/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Any, Dict, List, Literal, Optional, Tuple, Union, cast

import parsl
import pyarrow as pa
from parsl.app.app import python_app

from cytotable.exceptions import CytoTableException
Expand All @@ -26,7 +27,7 @@
@python_app
def _get_table_columns_and_types(
source: Dict[str, Any], sort_output: bool
) -> List[Dict[str, str]]:
) -> List[Optional[Dict[str, str]]]:
"""
Gather column data from table through duckdb.

Expand All @@ -38,7 +39,7 @@ def _get_table_columns_and_types(
Specifies whether to sort cytotable output or not.

Returns:
List[Dict[str, str]]
List[Optional[Dict[str, str]]]
list of dictionaries which each include column level information
"""

Expand All @@ -49,6 +50,12 @@ def _get_table_columns_and_types(
source_path = source["source_path"]
source_type = str(source_path.suffix).lower()

# If we have .npz files, return a list with None
# because we're querying a non-tabular data source.
# These will be handled later by _extract_npz_to_parquet.
if source_type == ".npz":
return [None]

# prepare the data source in the form of a duckdb query
select_source = (
f"read_csv_auto('{source_path}')"
Expand Down Expand Up @@ -279,7 +286,9 @@ def _get_table_keyset_pagination_sets(
page_key: str,
source: Optional[Dict[str, Any]] = None,
sql_stmt: Optional[str] = None,
) -> Union[List[Tuple[Union[int, float], Union[int, float]]], None]:
) -> Union[
List[Optional[Tuple[Union[int, float], Union[int, float]]]], List[None], None
]:
"""
Get table data chunk keys for later use in capturing segments
of values. This work also provides a chance to catch problematic
Expand All @@ -300,7 +309,7 @@ def _get_table_keyset_pagination_sets(
data source.

Returns:
List[Any]
Union[List[Optional[Tuple[Union[int, float], Union[int, float]]]], None]
List of keys to use for reading the data later on.
"""

Expand All @@ -324,8 +333,15 @@ def _get_table_keyset_pagination_sets(
with _duckdb_reader() as ddb_reader:
if source_type == ".csv":
sql_query = f"SELECT {page_key} FROM read_csv_auto('{source_path}', header=TRUE, delim=',') ORDER BY {page_key}"
else:
elif source_type == ".sqlite":
sql_query = f"SELECT {page_key} FROM sqlite_scan('{source_path}', '{table_name}') ORDER BY {page_key}"
elif source_type == ".npz":
# If we have npz files there's no need to paginate
# so we return None. None within a list is used as
# a special "passthrough" case within the pipeline
# so we may specially handle NPZ files later on via
# _source_pageset_to_parquet and _extract_npz_to_parquet.
return [None]

page_keys = [
results[0] for results in ddb_reader.execute(sql_query).fetchall()
Expand Down Expand Up @@ -360,14 +376,16 @@ def _get_table_keyset_pagination_sets(
page_keys = ddb_reader.execute(sql_query).fetchall()
page_keys = [key[0] for key in page_keys]

return _generate_pagesets(page_keys, chunk_size)
# The type: mention below is used to ignore a mypy linting error
# wherein it considers _generate_pagesets to be invalid.
return _generate_pagesets(page_keys, chunk_size) # type: ignore[return-value]


@python_app
def _source_pageset_to_parquet(
source_group_name: str,
source: Dict[str, Any],
pageset: Tuple[Union[int, float], Union[int, float]],
pageset: Optional[Tuple[Union[int, float], Union[int, float]]],
dest_path: str,
sort_output: bool,
) -> str:
Expand All @@ -380,7 +398,7 @@ def _source_pageset_to_parquet(
source: Dict[str, Any]
Contains the source data to be chunked. Represents a single
file or table of some kind along with collected information about table.
pageset: Tuple[int, int]
pageset: Optional[Tuple[Union[int, float], Union[int, float]]]
The pageset for chunking the data from source.
dest_path: str
Path to store the output data.
Expand All @@ -399,17 +417,42 @@ def _source_pageset_to_parquet(

from cytotable.utils import (
_duckdb_reader,
_extract_npz_to_parquet,
_sqlite_mixed_type_query_to_parquet,
_write_parquet_table_with_metadata,
)

source_type = str(source["source_path"].suffix).lower()

# attempt to build dest_path
source_dest_path = (
f"{dest_path}/{str(AnyPath(source_group_name).stem).lower()}/"
f"{str(source['source_path'].parent.name).lower()}"
)
pathlib.Path(source_dest_path).mkdir(parents=True, exist_ok=True)

# If we have npz files, we need to extract them in a specialized manner.
# See below for CSV and SQLite handling.
if source_type == ".npz":
return _extract_npz_to_parquet(
source_path=str(source["source_path"]),
dest_path=f"{source_dest_path}/{str(source['source_path'].stem)}.parquet",
tablenumber=source["tablenumber"],
)

elif pageset is None:
# if we have a `None` pageset and we're not using
# npz, then we have an exception (this shouldn't happen
# because we will need a pageset range to work with for
# table queries and npz files are handled above with
# the none case).
raise CytoTableException(
(
"No pageset range provided for source data"
" (required for non-NPZ datasets)."
)
)

# build tablenumber segment addition (if necessary)
tablenumber_sql = (
# to become tablenumber in sql select later with bigint (8-byte integer)
Expand Down Expand Up @@ -439,11 +482,11 @@ def _source_pageset_to_parquet(

# build output query and filepath base
# (chunked output will append offset to keep output paths unique)
if str(source["source_path"].suffix).lower() == ".csv":
if source_type == ".csv":
base_query = f"SELECT {select_columns} FROM read_csv_auto('{str(source['source_path'])}', header=TRUE, delim=',')"
result_filepath_base = f"{source_dest_path}/{str(source['source_path'].stem)}"

elif str(source["source_path"].suffix).lower() == ".sqlite":
elif source_type == ".sqlite":
base_query = f"SELECT {select_columns} FROM sqlite_scan('{str(source['source_path'])}', '{str(source['table_name'])}')"
result_filepath_base = f"{source_dest_path}/{str(source['source_path'].stem)}.{source['table_name']}"

Expand Down Expand Up @@ -840,7 +883,7 @@ def _join_source_pageset(
dest_path: str,
joins: str,
page_key: str,
pageset: Tuple[int, int],
pageset: Union[Tuple[int, int], None],
sort_output: bool,
drop_null: bool,
) -> str:
Expand Down Expand Up @@ -877,7 +920,7 @@ def _join_source_pageset(
)
SELECT *
FROM joined
WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]}
{f"WHERE {page_key} BETWEEN {pageset[0]} AND {pageset[1]}" if pageset is not None else ""}
/* optional sorting per pagset */
{"ORDER BY " + page_key if sort_output else ""};
"""
Expand All @@ -902,11 +945,13 @@ def _join_source_pageset(

result_file_path = (
# store the result in the parent of the dest_path
f"{str(pathlib.Path(dest_path).parent)}/"
f"{str(pathlib.Path(dest_path).parent)}/" +
# use the dest_path stem in the name
f"{str(pathlib.Path(dest_path).stem)}-"
f"{str(pathlib.Path(dest_path).stem)}-" +
# add the pageset indication to the filename
f"{pageset[0]}-{pageset[1]}.parquet"
if pageset is not None
else ".parquet"
)

# write the result
Expand Down Expand Up @@ -1001,9 +1046,9 @@ def _concat_join_sources(
def _infer_source_group_common_schema(
source_group: List[Dict[str, Any]],
data_type_cast_map: Optional[Dict[str, str]] = None,
) -> List[Tuple[str, str]]:
) -> List[Tuple[str, pa.DataType]]:
"""
Infers a common schema for group of parquet files which may have
Infers a common schema for a group of parquet files which may have
similar but slightly different schema or data. Intended to assist with
data concatenation and other operations.

Expand All @@ -1015,80 +1060,85 @@ def _infer_source_group_common_schema(
A dictionary mapping data type groups to specific types.
Roughly includes Arrow data types language from:
https://arrow.apache.org/docs/python/api/datatypes.html

Returns:
List[Tuple[str, str]]
List[Tuple[str, pa.DataType]]
A list of tuples which includes column name and PyArrow datatype.
This data will later be used as the basis for forming a PyArrow schema.
"""

import pyarrow as pa
import pyarrow.parquet as parquet

from cytotable.exceptions import SchemaException
from cytotable.utils import map_pyarrow_type

# read first file for basis of schema and column order for all others
# Read the first file to establish the base schema
common_schema = parquet.read_schema(source_group[0]["table"][0])

# infer common basis of schema and column order for all others
# Infer the common schema by comparing all schemas in the group
for schema in [
parquet.read_schema(table)
for source in source_group
for table in source["table"]
]:
# account for completely equal schema
# Skip if the schema matches the common schema
if schema.equals(common_schema):
continue

# gather field names from schema
# Gather field names from the schema
schema_field_names = [item.name for item in schema]

# reversed enumeration because removing indexes ascendingly changes schema field order
# Reverse enumeration to avoid index shifting when removing fields
for index, field in reversed(list(enumerate(common_schema))):
# check whether field name is contained within writer basis, remove if not
# note: because this only checks for naming, we defer to initially detected type
# Remove fields not present in the current schema
if field.name not in schema_field_names:
common_schema = common_schema.remove(index)

# check if we have a nulltype and non-nulltype conflict, deferring to non-nulltype
# Handle null vs non-null type conflicts
elif pa.types.is_null(field.type) and not pa.types.is_null(
schema.field(field.name).type
):
common_schema = common_schema.set(
index, field.with_type(schema.field(field.name).type)
)

# check if we have an integer to float challenge and enable later casting
# Handle integer to float type conflicts
elif pa.types.is_integer(field.type) and pa.types.is_floating(
schema.field(field.name).type
):
common_schema = common_schema.set(
index,
field.with_type(
# use float64 as a default here if we aren't casting floats
pa.float64()
if data_type_cast_map is None
or "float" not in data_type_cast_map.keys()
# otherwise use the float data type cast type
else pa.type_for_alias(data_type_cast_map["float"])
else pa.type_for_alias(
data_type_cast_map.get("float", "float64")
)
),
)

if len(list(common_schema.names)) == 0:
raise SchemaException(
(
"No common schema basis to perform concatenation for source group."
" All columns mismatch one another within the group."
)
)
# Handle nested or complex types dynamically
else:
common_schema = common_schema.set(
index,
field.with_type(
map_pyarrow_type(
field_type=field.type, data_type_cast_map=data_type_cast_map
)
),
)

# return a python-native list of tuples with column names and str types
return list(
zip(
common_schema.names,
[str(schema_type) for schema_type in common_schema.types],
# Validate the schema to ensure all types are valid PyArrow types
validated_schema = [
(
field.name,
map_pyarrow_type(
field_type=field.type, data_type_cast_map=data_type_cast_map
),
)
)
for field in common_schema
]

return validated_schema


def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
Expand Down Expand Up @@ -1185,9 +1235,9 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
matching_keys = [
key for key in page_keys.keys() if key.lower() in source_group_name.lower()
]
if not matching_keys:
if not matching_keys and source_datatype != "npz":
raise CytoTableException(
f"No matching key found in page_keys for source_group_name: {source_group_name}."
f"No matching key found in page_keys for source_group_name: {source_group_name}. "
"Please include a pagination key based on a column name from the table."
)

Expand All @@ -1198,11 +1248,16 @@ def _to_parquet( # pylint: disable=too-many-arguments, too-many-locals
source,
**{
"page_key": (
page_key := [
value
for key, value in page_keys.items()
if key.lower() in source_group_name.lower()
][0]
page_key := next(
(
value
for key, value in page_keys.items()
if key.lower() in source_group_name.lower()
),
# Placeholder value if no match is found
# used in cases for .npz source types.
"placeholder",
)
),
"pagesets": _get_table_keyset_pagination_sets(
source=source,
Expand Down
Loading