Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions src/genomicsdb.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,15 @@ cdef class _GenomicsDB:
flatten_intervals=False,
json_output=None,
arrow_output=None,
# batching only used with arrow_output
batching=False):
# batching/compress only used with arrow_output
batching=False,
compress=None):
""" Query for variant calls from the GenomicsDB workspace using array, column_ranges and row_ranges for subsetting """

if json_output is not None:
return self.query_variant_calls_json(array, column_ranges, row_ranges, query_protobuf, json_output);
elif arrow_output is not None:
return self.query_variant_calls_arrow(array, column_ranges, row_ranges, query_protobuf, batching);
return self.query_variant_calls_arrow(array, column_ranges, row_ranges, query_protobuf, batching, compress);
elif flatten_intervals is True:
return self.query_variant_calls_columnar(array, column_ranges, row_ranges, query_protobuf)
else:
Expand Down Expand Up @@ -318,7 +319,8 @@ cdef class _GenomicsDB:
column_ranges=None,
row_ranges=None,
query_protobuf: query_pb.QueryConfiguration=None,
batching=False):
batching=False,
compress=None):
""" Query for variant calls from the GenomicsDB workspace using array, column_ranges and row_ranges for subsetting """

cdef ArrowVariantCallProcessor processor
Expand Down Expand Up @@ -366,19 +368,24 @@ cdef class _GenomicsDB:
schema_capsule = pycapsule_get_arrow_schema(arrow_schema)
schema_obj = _ArrowSchemaWrapper._import_from_c_capsule(schema_capsule)
schema = pa.schema(schema_obj.children_schema)
yield schema.serialize().to_pybytes()
else:
raise GenomicsDBException("Failed to retrieve arrow schema for query_variant_calls()")

cdef void* arrow_array = NULL
w_opts = pa.ipc.IpcWriteOptions(allow_64bit=True, compression=compress)
while True:
try:
arrow_array = processor.arrow_array()
if arrow_array:
array_capsule = pycapsule_get_arrow_array(arrow_array)
array_obj = _ArrowArrayWrapper._import_from_c_capsule(schema_capsule, array_capsule)
arrays = [pa.array(array_obj.child(i)) for i in range(array_obj.n_children)]
yield pa.RecordBatch.from_arrays(arrays, schema=schema).serialize().to_pybytes()
batch = pa.RecordBatch.from_arrays(arrays, schema=schema)
sink = pa.BufferOutputStream()
writer = pa.RecordBatchStreamWriter(sink, schema, options=w_opts)
writer.write_batch(batch)
writer.close()
yield sink.getvalue().to_pybytes()
else:
break
except Exception as e:
Expand Down
24 changes: 8 additions & 16 deletions test/test_genomicsdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,29 +78,21 @@ def test_connect_and_query_with_protobuf(setup):
json_output=9999)

# test with query protobuf and arrow output
first = True
schema = pa.schema([("null_field", pa.string())])
for output in gdb.query_variant_calls(row_ranges=[(0, 3)], array="t0_1_2", arrow_output=True):
if first:
schema = pa.ipc.read_schema(pa.py_buffer(output))
first = False
else:
batch = pa.ipc.read_record_batch(pa.py_buffer(output), schema)
assert batch.num_columns == 6
assert batch.num_rows == 5
reader = pa.ipc.open_stream(output)
batch = reader.read_next_batch()
assert batch.num_columns == 6
assert batch.num_rows == 5

first = True
batch = None
for output in gdb.query_variant_calls(
row_ranges=[(0, 3)], array="t0_1_2", arrow_output=True, batching=True
):
if first:
schema = pa.ipc.read_schema(pa.py_buffer(output))
first = False
else:
batch = pa.ipc.read_record_batch(pa.py_buffer(output), schema)
assert batch.num_columns == 6
assert batch.num_rows == 1 or batch.num_rows == 3
reader = pa.ipc.open_stream(output)
batch = reader.read_next_batch()
assert batch.num_columns == 6
assert batch.num_rows == 1 or batch.num_rows == 3

# test with query contig interval and no results
interval = query_coords.ContigInterval()
Expand Down
10 changes: 3 additions & 7 deletions test/test_genomicsdb_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,10 @@ def test_genomicsdb_demo_with_arrow_output(self):
start = time.time()
gdb = genomicsdb.connect_with_protobuf(self.query_config)
print("\nSummary for batching mode=" + str(batching_mode) + ":")
first = True
for output in gdb.query_variant_calls(arrow_output=True, batching=batching_mode):
if first:
schema = pa.ipc.read_schema(pa.py_buffer(output))
first = False
else:
batch = pa.ipc.read_record_batch(pa.py_buffer(output), schema)
print("batch num_rows="+str(batch.num_rows)+" num_columns="+str(batch.num_columns))
reader = pa.ipc.open_stream(output)
batch = reader.read_next_batch()
print("batch num_rows="+str(batch.num_rows)+" num_columns="+str(batch.num_columns))
print("\tElapsed time: " + str(time.time() - start))

if __name__ == '__main__':
Expand Down