From 6f335ee020e43ccd429f24a592505f84de146bd7 Mon Sep 17 00:00:00 2001 From: Melvin Lathara Date: Thu, 5 Sep 2024 16:30:52 -0700 Subject: [PATCH] minor streamwriter changes to allow compression --- src/genomicsdb.pyx | 19 +++++++++++++------ test/test_genomicsdb.py | 24 ++++++++---------------- test/test_genomicsdb_demo.py | 10 +++------- 3 files changed, 24 insertions(+), 29 deletions(-) diff --git a/src/genomicsdb.pyx b/src/genomicsdb.pyx index dcec664..a9d92d0 100644 --- a/src/genomicsdb.pyx +++ b/src/genomicsdb.pyx @@ -199,14 +199,15 @@ cdef class _GenomicsDB: flatten_intervals=False, json_output=None, arrow_output=None, - # batching only used with arrow_output - batching=False): + # batching/compress only used with arrow_output + batching=False, + compress=None): """ Query for variant calls from the GenomicsDB workspace using array, column_ranges and row_ranges for subsetting """ if json_output is not None: return self.query_variant_calls_json(array, column_ranges, row_ranges, query_protobuf, json_output); elif arrow_output is not None: - return self.query_variant_calls_arrow(array, column_ranges, row_ranges, query_protobuf, batching); + return self.query_variant_calls_arrow(array, column_ranges, row_ranges, query_protobuf, batching, compress); elif flatten_intervals is True: return self.query_variant_calls_columnar(array, column_ranges, row_ranges, query_protobuf) else: @@ -318,7 +319,8 @@ cdef class _GenomicsDB: column_ranges=None, row_ranges=None, query_protobuf: query_pb.QueryConfiguration=None, - batching=False): + batching=False, + compress=None): """ Query for variant calls from the GenomicsDB workspace using array, column_ranges and row_ranges for subsetting """ cdef ArrowVariantCallProcessor processor @@ -366,11 +368,11 @@ cdef class _GenomicsDB: schema_capsule = pycapsule_get_arrow_schema(arrow_schema) schema_obj = _ArrowSchemaWrapper._import_from_c_capsule(schema_capsule) schema = pa.schema(schema_obj.children_schema) - yield schema.serialize().to_pybytes() else: raise GenomicsDBException("Failed to retrieve arrow schema for query_variant_calls()") cdef void* arrow_array = NULL + w_opts = pa.ipc.IpcWriteOptions(allow_64bit=True, compression=compress) while True: try: arrow_array = processor.arrow_array() @@ -378,7 +380,12 @@ cdef class _GenomicsDB: array_capsule = pycapsule_get_arrow_array(arrow_array) array_obj = _ArrowArrayWrapper._import_from_c_capsule(schema_capsule, array_capsule) arrays = [pa.array(array_obj.child(i)) for i in range(array_obj.n_children)] - yield pa.RecordBatch.from_arrays(arrays, schema=schema).serialize().to_pybytes() + batch = pa.RecordBatch.from_arrays(arrays, schema=schema) + sink = pa.BufferOutputStream() + writer = pa.RecordBatchStreamWriter(sink, schema, options=w_opts) + writer.write_batch(batch) + writer.close() + yield sink.getvalue().to_pybytes() else: break except Exception as e: diff --git a/test/test_genomicsdb.py b/test/test_genomicsdb.py index 7e0a38f..f7e0667 100644 --- a/test/test_genomicsdb.py +++ b/test/test_genomicsdb.py @@ -78,29 +78,21 @@ def test_connect_and_query_with_protobuf(setup): json_output=9999) # test with query protobuf and arrow output - first = True schema = pa.schema([("null_field", pa.string())]) for output in gdb.query_variant_calls(row_ranges=[(0, 3)], array="t0_1_2", arrow_output=True): - if first: - schema = pa.ipc.read_schema(pa.py_buffer(output)) - first = False - else: - batch = pa.ipc.read_record_batch(pa.py_buffer(output), schema) - assert batch.num_columns == 6 - assert batch.num_rows == 5 + reader = pa.ipc.open_stream(output) + batch = reader.read_next_batch() + assert batch.num_columns == 6 + assert batch.num_rows == 5 - first = True batch = None for output in gdb.query_variant_calls( row_ranges=[(0, 3)], array="t0_1_2", arrow_output=True, batching=True ): - if first: - schema = pa.ipc.read_schema(pa.py_buffer(output)) - first = False - else: - batch = pa.ipc.read_record_batch(pa.py_buffer(output), schema) - assert batch.num_columns == 6 - assert batch.num_rows == 1 or batch.num_rows == 3 + reader = pa.ipc.open_stream(output) + batch = reader.read_next_batch() + assert batch.num_columns == 6 + assert batch.num_rows == 1 or batch.num_rows == 3 # test with query contig interval and no results interval = query_coords.ContigInterval() diff --git a/test/test_genomicsdb_demo.py b/test/test_genomicsdb_demo.py index 766dfa4..e49c168 100644 --- a/test/test_genomicsdb_demo.py +++ b/test/test_genomicsdb_demo.py @@ -111,14 +111,10 @@ def test_genomicsdb_demo_with_arrow_output(self): start = time.time() gdb = genomicsdb.connect_with_protobuf(self.query_config) print("\nSummary for batching mode=" + str(batching_mode) + ":") - first = True for output in gdb.query_variant_calls(arrow_output=True, batching=batching_mode): - if first: - schema = pa.ipc.read_schema(pa.py_buffer(output)) - first = False - else: - batch = pa.ipc.read_record_batch(pa.py_buffer(output), schema) - print("batch num_rows="+str(batch.num_rows)+" num_columns="+str(batch.num_columns)) + reader = pa.ipc.open_stream(output) + batch = reader.read_next_batch() + print("batch num_rows="+str(batch.num_rows)+" num_columns="+str(batch.num_columns)) print("\tElapsed time: " + str(time.time() - start)) if __name__ == '__main__':