Skip to content
This repository was archived by the owner on Oct 23, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon_api/conf/config.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
title=GA4GHBeacon at CSC

# Version of the Beacon implementation
version=1.7.2
version=1.8.0

# Author of this software
author=CSC developers
Expand Down
141 changes: 85 additions & 56 deletions beacon_api/utils/db_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"""

import os
import sys
import argparse
import json
import itertools
Expand All @@ -46,6 +47,7 @@
import asyncpg
from cyvcf2 import VCF

from pathlib import Path
from datetime import datetime

from .logging import LOG
Expand Down Expand Up @@ -280,19 +282,19 @@ def _chunks(self, iterable, size):
for first in iterator:
yield itertools.chain([first], itertools.islice(iterator, size - 1))

async def load_datafile(self, vcf, datafile, dataset_id, n=1000):
async def load_datafile(self, vcf, datafile, dataset_id, n=1000, min_ac=1):
"""Parse data from datafile and send it to be inserted."""
LOG.info(f"Read data from {datafile}")
try:
LOG.info("Generate database queue(s)")
data = self._chunks(vcf, n)
for record in data:
await self.insert_variants(dataset_id, list(record))
await self.insert_variants(dataset_id, list(record), min_ac)
LOG.info(f"{datafile} has been processed")
except Exception as e:
LOG.error(f"AN ERROR OCCURRED WHILE GENERATING DB QUEUE -> {e}")

async def insert_variants(self, dataset_id, variants):
async def insert_variants(self, dataset_id, variants, min_ac):
"""Insert variant data to the database."""
LOG.info(f"Received {len(variants)} variants for insertion to {dataset_id}")
try:
Expand All @@ -301,67 +303,82 @@ async def insert_variants(self, dataset_id, variants):
LOG.info("Insert variants into the database")
for variant in variants:
# params = (frequency, count, actual variant Type)
# Nothing interesting on the variant with no aaf
# because none of the samples have it
if variant.aaf > 0:
params = self._unpack(variant)
# Coordinates that are read from VCF are 1-based,
# cyvcf2 reads them as 0-based, and they are inserted into the DB as such

# We Process Breakend Records into a different table for now
if params[5] != []:
# await self.insert_mates(dataset_id, variant, params)
# Most likely there will be only one BND per Record
for bnd in params[5]:
params = self._unpack(variant)
# Coordinates that are read from VCF are 1-based,
# cyvcf2 reads them as 0-based, and they are inserted into the DB as such

# params may carry single variants [1] or packed variants [20, 15, 10, 1]
# The first check prunes for single variants, packed variants must be removed afterwards
if params[1][0] >= min_ac:
# Remove packed variants that don't meet the minimum allele count requirements
# Packed variants are always ordered from largest to smallest, this process starts
# popping values from the right (small) side until there are no more small values to pop
while params[1][-1] < min_ac:
params[0].pop() # aaf
params[1].pop() # ac
params[2].pop() # vt
params[3].pop() # alt
if len(params[5]) > 0:
params[5].pop() # bnd

# Nothing interesting on the variant with no aaf
# because none of the samples have it
if variant.aaf > 0:

# We Process Breakend Records into a different table for now
if params[5] != []:
# await self.insert_mates(dataset_id, variant, params)
# Most likely there will be only one BND per Record
for bnd in params[5]:
await self._conn.execute(
"""INSERT INTO beacon_mate_table
(datasetId, chromosome, chromosomeStart, chromosomePos,
mate, mateStart, matePos, reference, alternate, alleleCount,
callCount, frequency, "end")
SELECT ($1), ($2), ($3), ($4),
($5), ($6), ($7), ($8), t.alt, t.ac, ($11), t.freq, ($13)
FROM (SELECT unnest($9::varchar[]) alt, unnest($10::integer[]) ac,
unnest($12::float[]) freq) t
ON CONFLICT (datasetId, chromosome, mate, chromosomePos, matePos)
DO NOTHING""",
dataset_id,
variant.CHROM.replace("chr", ""),
variant.start,
variant.ID,
bnd[0].replace("chr", ""),
bnd[1],
bnd[6],
variant.REF,
params[3],
params[1],
params[4],
params[0],
variant.end,
)
else:
await self._conn.execute(
"""INSERT INTO beacon_mate_table
(datasetId, chromosome, chromosomeStart, chromosomePos,
mate, mateStart, matePos, reference, alternate, alleleCount,
callCount, frequency, "end")
SELECT ($1), ($2), ($3), ($4),
($5), ($6), ($7), ($8), t.alt, t.ac, ($11), t.freq, ($13)
FROM (SELECT unnest($9::varchar[]) alt, unnest($10::integer[]) ac,
unnest($12::float[]) freq) t
ON CONFLICT (datasetId, chromosome, mate, chromosomePos, matePos)
DO NOTHING""",
"""INSERT INTO beacon_data_table
(datasetId, chromosome, start, reference, alternate,
"end", aggregatedVariantType, alleleCount, callCount, frequency, variantType)
SELECT ($1), ($2), ($3), ($4), t.alt, ($6), ($7), t.ac, ($9), t.freq, t.vt
FROM (SELECT unnest($5::varchar[]) alt, unnest($8::integer[]) ac,
unnest($10::float[]) freq, unnest($11::varchar[]) as vt) t
ON CONFLICT (datasetId, chromosome, start, reference, alternate)
DO NOTHING""",
dataset_id,
variant.CHROM.replace("chr", ""),
variant.start,
variant.ID,
bnd[0].replace("chr", ""),
bnd[1],
bnd[6],
variant.REF,
params[3],
variant.end,
variant.var_type.upper(),
params[1],
params[4],
params[0],
variant.end,
params[2],
)
else:
await self._conn.execute(
"""INSERT INTO beacon_data_table
(datasetId, chromosome, start, reference, alternate,
"end", aggregatedVariantType, alleleCount, callCount, frequency, variantType)
SELECT ($1), ($2), ($3), ($4), t.alt, ($6), ($7), t.ac, ($9), t.freq, t.vt
FROM (SELECT unnest($5::varchar[]) alt, unnest($8::integer[]) ac,
unnest($10::float[]) freq, unnest($11::varchar[]) as vt) t
ON CONFLICT (datasetId, chromosome, start, reference, alternate)
DO NOTHING""",
dataset_id,
variant.CHROM.replace("chr", ""),
variant.start,
variant.REF,
params[3],
variant.end,
variant.var_type.upper(),
params[1],
params[4],
params[0],
params[2],
)

LOG.debug("Variants have been inserted")

LOG.debug("Variants have been inserted")
except Exception as e:
LOG.error(f"AN ERROR OCCURRED WHILE ATTEMPTING TO INSERT VARIANTS -> {e}")

Expand All @@ -379,6 +396,7 @@ async def init_beacon_db(arguments=None):
"""Run database operations here."""
# Fetch command line arguments
args = parse_arguments(arguments)
validate_arguments(args)

# Initialise the database connection
db = BeaconDB()
Expand All @@ -400,12 +418,22 @@ async def init_beacon_db(arguments=None):
dataset_id = await db.load_metadata(vcf, args.metadata, args.datafile)

# Insert data into the database
await db.load_datafile(vcf, args.datafile, dataset_id)
await db.load_datafile(vcf, args.datafile, dataset_id, min_ac=int(args.min_allele_count))

# Close the database connection
await db.close()


def validate_arguments(arguments):
"""Check that given arguments are valid."""
if not Path(arguments.datafile).is_file():
sys.exit(f"Could not find datafile: {arguments.datafile}")
if not Path(arguments.metadata).is_file():
sys.exit(f"Could not find metadata file: {arguments.metadata}")
if not arguments.min_allele_count.isdigit():
sys.exit(f"Minimum allele count --min_allele_count must be a positive integer, received: {arguments.min_allele_count}")


def parse_arguments(arguments):
"""Parse command line arguments."""
parser = argparse.ArgumentParser(
Expand All @@ -415,7 +443,8 @@ def parse_arguments(arguments):
)
parser.add_argument("datafile", help=".vcf file containing variant information")
parser.add_argument("metadata", help=".json file containing metadata associated to datafile")
parser.add_argument("--samples", default=None, help="comma separated string of samples to process")
parser.add_argument("--samples", default=None, help="comma separated string of samples to process. EXPERIMENTAL")
parser.add_argument("--min_allele_count", default="1", help="minimum allele count can be raised to ignore rare variants. Default value is 1")
return parser.parse_args(arguments)


Expand Down
31 changes: 22 additions & 9 deletions docs/instructions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,11 @@ Starting PostgreSQL using Docker:
.. code-block:: console

cd beacon-python
docker run -e POSTGRES_USER=beacon \
docker run -d \
-e POSTGRES_USER=beacon \
-e POSTGRES_PASSWORD=beacon \
-e POSTGRES_DB=beacondb \
-v "$PWD/data":/docker-entrypoint-initdb.d
-v "$PWD/data":/docker-entrypoint-initdb.d \
-p 5432:5432 postgres:11.6

.. hint:: If one has their own database the ``beacon_init`` utility can be skipped,
Expand All @@ -182,19 +183,25 @@ For loading datasets to database we provide the ``beacon_init`` utility:

.. code-block:: console

╰─$ beacon_init --help
usage: beacon_init [-h] datafile metadata
$ beacon_init --help
usage: beacon_init [-h] [--samples SAMPLES]
[--min_allele_count MIN_ALLELE_COUNT]
datafile metadata

Load datafiles with associated metadata into the beacon database. See example
data and metadata files in the /data directory.

positional arguments:
datafile .vcf file containing variant information
metadata .json file containing metadata associated to datafile
datafile .vcf file containing variant information
metadata .json file containing metadata associated to datafile

optional arguments:
--samples comma separated string of samples to process
-h, --help show this help message and exit
-h, --help show this help message and exit
--samples SAMPLES comma separated string of samples to process.
EXPERIMENTAL
--min_allele_count MIN_ALLELE_COUNT
minimum allele count can be raised to ignore rare
variants. Default value is 1

As an example, a dataset metadata could be:

Expand All @@ -221,12 +228,18 @@ For loading data into the database we can proceed as follows:

$ beacon_init data/ALL.chrMT.phase3_callmom-v0_4.20130502.genotypes.vcf.gz data/example_metadata.json

For loading data into the database from selected samples only we can proceed as follows:
(EXPERIMENTAL) For loading data into the database from selected samples only we can proceed as follows:

.. code-block:: console

$ beacon_init data/ALL.chrMT.phase3_callmom-v0_4.20130502.genotypes.vcf.gz data/example_metadata.json --samples HG0001,HG0002,HG0003

For ignoring rare alleles, set a minimum allele count with ``--min_allele_count``:

.. code-block:: console

$ beacon_init data/ALL.chrMT.phase3_callmom-v0_4.20130502.genotypes.vcf.gz data/example_metadata.json --min_allele_count 20

.. note:: One dataset can have multiple files, in order to add more files to one dataset, repeat the command above.
The parameters ``callCount`` and ``variantCount`` from the metadata file reflect values of the entire dataset.
These values can be initialised with ``0`` if they are not known and updated in ``beacon_dataset_counts_table`` table.
Expand Down
4 changes: 2 additions & 2 deletions tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,15 @@ async def create_tables(self, sql_file):
"""Mimic create_tables."""
pass

async def insert_variants(self, dataset_id, variants, len_samples):
async def insert_variants(self, dataset_id, variants, min_ac):
"""Mimic insert_variants."""
pass

async def load_metadata(self, vcf, metafile, datafile):
"""Mimic load_metadata."""
pass

async def load_datafile(self, vcf, datafile, datasetId):
async def load_datafile(self, vcf, datafile, datasetId, n=1000, min_ac=1):
"""Mimic load_datafile."""
return ["datasetId", "variants"]

Expand Down
2 changes: 1 addition & 1 deletion tests/test_db_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ async def test_insert_variants(self, db_mock, mock_log):
db_mock.return_value = Connection()
await self._db.connection()
db_mock.assert_called()
await self._db.insert_variants('DATASET1', ['C'])
await self._db.insert_variants('DATASET1', ['C'], 1)
# Should assert logs
mock_log.info.mock_calls = ['Received 1 variants for insertion to DATASET1',
'Insert variants into the database']
Expand Down