genomic relatedness detection pipeline

public public 1yr ago Version: v1.8 0 bookmarks

Description

GRAPE is an open-source end-to-end Genomic RelAtedness detection PipelinE.

It requires a single multisamples VCF file as input and has a separate workflow for downloading and checking the integrity of reference datasets.

GRAPE incorporates comprehensive data preprocessing, quality control (QC), and several workflows for relatedness inference.

scheme

Installation

docker build -t grape:latest -f containers/snakemake/Dockerfile -m 8GB .

The pipeline has three main steps:

  1. Downloading of the reference datasets.

  2. Quality control and data preprocessing.

  3. Application of the relationship inference workflow.

The pipeline is implemented with the Snakemake framework and can be accessible through the snakemake command.

All the pipeline functionality is embedded into the GRAPE launcher: launcher.py , that invokes Snakemake under the hood. By default, all the commands just check the accessibility of the data and build the computational graph.

To actually perform computation one should add --real-run flag to all the commands.

Downloading the Reference Datasets

This step only needs to be done once.

To download reference dataset one should use reference command of the pipeline launcher.

The command below downloads needed references to the directory specified by the flag --ref-directory . After that --ref-directory argument should be used in all subsequent commands.

docker run --rm -it -v /media:/media -v /etc/localtime:/etc/localtime:ro \
 grape:latest launcher.py reference --ref-directory /media/ref --real-run

If phasing and imputation are required (mainly for the GERMLINE workflow) one should specify additional --phase and --impute flags to previous command to download additional reference datasets.

Total amount of required disk space must be at least 50GB .

docker run --rm -it -v /media:/media -v /etc/localtime:/etc/localtime:ro \
 grape:latest launcher.py reference \
 --ref-directory /media/ref --phase --impute --real-run

There are another options to download all required reference data as a single file. This file is prepared by GenX team and preloaded on our side in the cloud.

It can be done by specifying additional flag --use-bundle to the reference command. This way is faster, since all the post-processing procedures have been already performed.

docker run --rm -it -v /media:/media -v /etc/localtime:/etc/localtime:ro \
 grape:latest launcher.py reference --use-bundle \
 --ref-directory /media/ref --phase --impute --real-run

Quality Control and Data Preprocessing

GRAPE have a versatile and configurable preprocessing workflow. One part of the preprocessing is required and must be performed before the relationship inference workflow.

Preprocessing is launched by the preprocess command of the GRAPE. Along with some necessary technical procedures, preprocessing includes the following steps.

  • [Required] SNPs quality control by minor allele frequency (MAF) and the missingness rate . We discovered that blocks of rare SNPs with low MAF value in genotype arrays may produce false positive IBD segments. To address this problem, we filter SNPs by minor allele frequency. We remove SNPs with MAF value less than 0.02. Additionally, we remove multiallelic SNPs, insertions / deletions, and SNPs with the high missingness rate, because such SNPs are inconsistent with IBD detection tools.

  • [Required] Per-sample quality control, using missingness and heterozygosity . Extensive testing revealed that samples with an unusually low level of heterozygosity could produce many false relatives matches among individuals. GRAPE excludes such samples from the analysis and creates a report file with the description of the exclusion reason.

  • [Required] Control for strands and SNP IDs mismatches . During this step GRAPE fixes inconsistencies in strands and reference alleles.

  • [Optional] LiftOver from hg38 to hg37 . Currently GRAPE uses hg37 build version of the human genome reference. The pipeline supports input in hg38 and hg37 builds (see --assembly flag of the pipeline launcher). If hg38 build is selected ( --assembly h38 ), then GRAPE applies liftOver tool to the input data in order to match the hg37 reference assembly.

  • [Optional] Phasing and imputation . GRAPE supports phasing and genotype imputation using 1000 Genomes Project reference panel. GERMLINE IBD detection tool requires phased data. So, if input data is unphased, one should include phasing (Eagle 2.4.1) ( --phase flag) into the preprocessing before running the GERMLINE workflow. If input data is highly heterogeneous in a sense of available SNPs positions, one can also add imputation (Minimac4) procedure to the preprocessing ( --impute flag).

  • [Optional] Removal of imputed SNPs . We found, that if input data is homogeneous in a sense of SNPs positions, the presence of imputed SNPs does not affect the overall IBD detection accuracy of the IBIS tool, but it significantly slows down the overall performance. For this particular case, when input data initially contains a lot of imputed SNPs, we recommend to remove them by specifying --remove-imputation flag to the GRAPE launcher. GRAPE removes all SNPs which are marked with IMPUTED flag in the input VCF file.

Usages

  1. Preprocessing for the IBIS + KING relatedness inference workflow.

    • Input file is located at /media/input.vcf.gz .

    • GRAPE working directory is /media/data .

    • Assembly of the input VCF file is hg37 .

docker run --rm -it -v /media:/media -v /etc/localtime:/etc/localtime:ro \
 grape:latest launcher.py preprocess --ref-directory /media/ref \
 --vcf-file /media/input.vcf.gz --directory /media/data --assembly hg37 --real-run
  1. Preprocessing for the GERMLINE + KING workflow.

    • Assembly of the input VCF file is hg38 .

    • GERMLINE can work with phased data only, so we add phasing procedure to the preprocessing.

    • Genotype imputation is also added.

docker run --rm -it -v /media:/media -v /etc/localtime:/etc/localtime:ro \
 grape:latest launcher.py preprocess --ref-directory /media/ref \
 --vcf-file /media/input.vcf.gz --directory /media/data \
 --assembly hg38 --phase --impute --flow germline-king --real-run

GRAPE Relatedness Inference Workflows

There are three relationship inference workflows implemented in GRAPE.

These workflows are activated by the find command of the launcher.

Workflow selection is made by the --flow parameter.

  1. IBIS + ERSA , --flow ibis . During this workflow IBD segments detection is performed by IBIS, and estimation of relationship degree is carried out by means of ERSA algorithm. This is the fastest workflow.

  2. IBIS + ERSA & KING , --flow ibis-king . During this workflow, GRAPE uses KING tool for the first 3 degrees of relationships, and IBIS + ERSA approach for higher order degrees.

  3. GERMLINE + ERSA & KING , --flow germline-king . The workflow uses GERMLINE for IBD segments detection. KING is used to identify relationship for the first 3 degrees, and ERSA algorithm is used for higher order degrees. This workflow was added to GRAPE mainly for the case, when input data is already phased and accurately preprocessed.

  4. (BETA!) RaPID + ERSA , --flow rapid This workflow uses RaPID for the IBD segments detection and requires phased and preprocessed data as input.

Usages

  1. Relationship inference with the IBIS + ERSA & KING workflow.
docker run --rm -it -v /media:/media -v /etc/localtime:/etc/localtime:ro \
 grape:latest launcher.py find --flow ibis-king --ref-directory /media/ref \
 --directory /media/data --ibis-seg-len 7 --ibis-min-snp 500 \
 --zero-seg-count 0.5 --zero-seg-len 5.0 --alpha 0.01 --real-run
  1. Relationship inference with GERMLINE + ERSA & KING workflow.
docker run --rm -it -v /media:/media -v /etc/localtime:/etc/localtime:ro \
 grape:latest launcher.py find --flow germline-king --ref-directory /media/ref \
 --directory /media/data --zero-seg-count 0.5 --zero-seg-len 5.0 \
 --alpha 0.01 --real-run

Description of the IBIS and ERSA Parameters

  • [IBIS] --ibis-seg-len . Minimum length of the IBD segment to be found by IBIS. Higher values reduce false positive rate and give less distant matches (default = 7 cM).

  • [IBIS] --ibis-min-snp . Minimum number of SNPs per IBD segment to be detected (default = 500 SNPs).

  • [ERSA] --zero-seg-count . Mean number of shared segments for two unrelated individuals in the population. Smaller values tend to give more distant matches and increase the false positive rate (default = 0.5).

  • [ERSA] --zero-seg-len . Average length of IBD segment for two unrelated individuals in the population. Smaller values tend to give more distant matches and increase the false positive rate (default = 5 cM).

  • [ERSA] --alpha . ERSA significance level (default = 0.01).

Description of the RaPID Parameters

  • [RaPID] --rapid-seg-len . Minimum length of the IBD segment to be found by RaPID. Higher values reduce false positive rate and give less distant matches (default = 5 cM).

  • [RaPID] `--rapid-num-runs``. Number of random projections for RaPID algorithm

  • [RaPID] `--rapid-num-success``. Number of matches for a segment. Higher values should reduce false positive rate but also reduce recall.

Description of the Output Relatives File

Output relatives file is in TSV file format. It contains one line for each detected pair of relatives.

id1 id2 king_degree king_relation shared_genome_proportion kinship kinship_degree ersa_degree ersa_lower_bound ersa_upper_bound shared_ancestors final_degree total_seg_len seg_count
g1-b1-i1 g2-b1-i1 1 PO 0.4996 0.2493 1.0 1 1 1 0.0 1 3359.9362945470302 40
g1-b1-i1 g2-b2-i1 1 PO 0.4997 0.2459 1.0 1 1 1 0.0 1 3362.012253715002 40
g1-b1-i1 g2-b3-i1 1 PO 0.4999 0.2467 1.0 1 1 1 0.0 1 3363.150814131464 40
g1-b1-i1 g3-b1-i1 2 2 0.2369 0.1163 2.0 2 2 2 1.0 2 1644.634182188072 60
  • id1 - ID of the first sample in a pair of relatives.

  • id2 - ID of the second sample in a pair of relatives, id1 is always less than id2 .

  • king_degree - Numeric degree of relationship estimated by KING;

    • 0 means duplicates or MZ twins,

    • 1 means parent-offspring (PO),

    • 2 can be either full siblings (FS), half siblings and grandmother/grandfather with a granddaughter/grandson.

    • 3 is aunt/uncle with a niece/nephew, as described in table in https://en.wikipedia.org/wiki/Coefficient_of_relationship.

    If king_degree exists, then final_degree will be equal to king_degree .

  • king_relation - further differentiation for the first 2 degrees of KING; king_degree equals 1 means PO - parent-offspring, also KING detects FS in some of second degrees.

  • shared_genome_proportion is the approximate fraction of genome shared by two individuals. It should be approximately 0.5 for PO and FS, 0.25 for grandmother-granddaughter and half-siblings. For the first 3 degrees it is calculated as total length of IBD2 segments + half of total length of IBD1 segments. For 4th+ degrees it is simply half of total length of IBD1 segments.

  • kinship is the KING kinship coefficient.

  • ersa_degree is the degree estimated from IBD segments by ERSA, it is used for the final_degree when king_degree does not exist.

  • ersa_lower_bound / ersa_upper_bound is the lower / upper bound of the degree estimated by the ERSA using confidence interval corresponding to the significance level α ( --alpha ). Accordingly to the ERSA likelihood model, true degree does not belong to this interval with probability equals to α.

  • shared_ancestors is the most likeliest number of shared ancestors. If it equals 0 , then one relative is a direct descendant of the other; if equals 1 , then they most probably have one common ancestor and represent half siblings; if equals 2 , then, for examples, individuals may have common mother and father.

  • final_degree is the final relationship degree predicted by GRAPE. If KING is used ( --flow ibis-king or --flow germline-king ), it equals king_degree for close relatives up to the 3rd degree, and ersa_degree for distant relatives.

  • total_seg_len is the total length of all IBD1 segments. It's calculated using IBIS or GERMLINE IBD data. If KING is involved, then for the first 3 degrees it's calculated using KING IBD data.

  • seg_count is the total number of all IBD segments found by IBIS / GERMLINE tools. If KING is involved, the total number of found IBD segments is taken from KING for the first 3 degrees.

IBD Segments Weighting

Distribution of IBD segments among non-related (background) individuals within a population may be quite heterogeneous. There may exist genome regions with extremely high rates of overall matching, which are not inherited from the recent common ancestors. Instead, these regions more likely reflect other demographic factors of the population.

The implication is that IBD segments detected in such regions are expected to be less useful for estimating recent relationships. Moreover, such regions potentially prone to false-positive IBD segments.

GRAPE provides two options to address this issue.

The first one is based on genome regions exclusion mask, wherein some genome regions are completely excluded from the consideration. This approach is implemented in ERSA and is used by GRAPE by default.

The second one is based on the IBD segments weighing.

The key idea is to down-weight IBD segment, i.e. reduce the IBD segment length, if the segment cross regions with high rate of matching.

Down-weighted segments are then passed to the ERSA algorithm.

GRAPE provides an ability to compute the weight mask from the VCF file with presumably unrelated individuals. This mask is used during the relatedness detection by specifying the --weight-mask flag of the launcher.

See more information in out GRAPE preprint .

Computation of the IBD segments weighing mask

docker run --rm -it -v /media:/media \
 grape:latest launcher.py compute-weight-mask \
 --directory /media/background --assembly hg37 \
 --real-run --ibis-seg-len 5 --ibis-min-snp 400

The resulting files consist of a weight mask file in JSON format and a visualization of the mask stored in /media/background/weight-mask/ folder.

Usage of the IBD segments weighing mask

docker run --rm -it -v /media:/media \
 grape:latest launcher.py find --flow ibis --ref-directory /media/ref \
 --weight-mask /media/background/weight-mask/mask.json \
 --directory /media/data --assembly hg37 \
 --real-run --ibis-seg-len 5 --ibis-min-snp 400

Execution by Scheduler

The pipeline can be executed using lightweight scheduler Funnel , which implements Task Execution Schema developed by GA4GH .

During execution, incoming data for analysis can be obtained in several ways: locally, FTP, HTTPS, S3, Google, etc. The resulting files can be uploaded in the same ways.

It is possible to add other features such as writing to the database, and sending to the REST service.

The scheduler itself can work in various environments from a regular VM to a Kubernetes cluster with resource quotas support.

For more information see the doc .

How to use Funnel:

# Start the Funnel server
/path/to/funnel server run
# Use Funnel as client
/path/to/funnel task create funnel/grape-real-run.json

Launch with Dockstore

GRAPE supports execution with the Dockstore.

Dockstore page for the GRAPE pipeline can be found here .

  1. Install dockstore .

  2. Clone the GRAPE repository.

  3. Download reference datasets.

    There are four available options:

# Download minimal reference datasets from public available sources (without phasing and imputation)
dockstore tool launch --local-entry workflows/reference/cwl/ref_min.cwl --json workflows/reference/cwl/config.json --script
# Download complete reference datasets from public available sources (for phasing and imputation)
dockstore tool launch --local-entry workflows/reference/cwl/ref.cwl --json workflows/reference/cwl/config.json --script
# Download minimal reference datasets as bundle from (without phasing and imputation)
dockstore tool launch --local-entry workflows/bundle/cwl/bundle_min.cwl --json workflows/reference/bundle/config.json --script
# Download complete reference datasets as bundle from (for phasing and imputation)
dockstore tool launch --local-entry workflows/bundle/cwl/bundle.cwl --json workflows/reference/bundle/config.json --script
  1. Run the preprocessing.
dockstore tool launch --local-entry workflows/preprocess2/cwl/preprocess.cwl --json workflows/preprocess2/cwl/config.json --script
  1. Run the relatedness inference workflow.
dockstore tool launch --local-entry workflows/find/cwl/find.cwl --json workflows/find/cwl/config.json --script

Each pipeline step requires config.json .

Each config has predefined directories and paths, but you can override these paths by changing them in these files.

Also notice that Dockstore saves its runtime in datastore directory. This directory will grow up with each run. We recommend to clean it up after each step, especially after reference downloading.

Evaluation on Simulated Data

We added a simulation workflow into GRAPE to perform a precision / recall analysis of the pipeline.

It's accessible by simulate command of the pipeline launcher and incorporates the following steps:

  1. Pedigree simulation with unrelated founders; we use the Ped-sim simulation package.

  2. Relatedness degrees estimation;

  3. Comparison between true and estimated degrees.

The source dataset for the simulation is taken from CEU population data of 1000 Genomes Project. As CEU data consists of trios, we picked no more than one member of each trio as a founder.

We also ran GRAPE on selected individuals to remove all cryptic relationships up to the 6th degree.

Then, we randomly assigned sex to each individual and used sex-specific genetic maps to take into account the differences in recombination rates between men and women.

A visualization of structure of simulated pedigree is given below:

pedigree

How to Run Simulation

Use the simulate command of the GRAPE launcher.

docker run --rm -it -v /media:/media -v /etc/localtime:/etc/localtime:ro \
 grape:latest launcher.py simulate --flow ibis-king --ref-directory /media/ref \
 --directory /media/data --sim-params-file params/relatives_average.def \
 --sim-samples-file ceph_unrelated_all.tsv --assembly hg37 --ibis-seg-len 5 \
 --ibis-min-snp 400 --zero-seg-count 0.1 --zero-seg-len 5 --alpha 0.01 --real-run

Precision / Recall Analysis

For each i degree of relationships we computed precision and recall metrics:

\mathrm{Precision}(i) = \frac{\mathrm{TP}(i)}{\mathrm{TP}(i) + \mathrm{FP}(i)}; \quad \mathrm{Recall}(i) = \frac{\mathrm{TP}(i)}{\mathrm{TP}(i) + \mathrm{FN}(i)}

Here TP(i), FP(i), FN(i) are the numbers of true positive, false positive, and false negative relationship matches predicted for the degree i .

In our analysis we used non-exact (fuzzy) interval metrics.

  • For the 1st degree, we require an exact match.

  • For the 2nd, 3rd, and 4th degrees, we allow a degree interval of ±1. For example, for the 2nd true degree we consider a predicted 3rd degree as a true positive match.

  • For the 5th+ degrees, we use the ERSA confidence intervals which are typically 3-4 degrees wide.

  • For 10th+ degrees, these intervals are 6-7 degrees wide.

precision & recall

Known Limitations

It is known that for some small isolated populations IBD sharing is very high. Therefore, our pipeline overestimates the relationship degree for them.

It is not recommended to mix standard populations like CEU and the small populations like isolated native ones.

This problem is discussed in https://journals.plos.org/plosone/article?id=10.1371/journal.pone.0034267.

Credits

License: GNU GPL v3

Support

Join our Telegram chat for the support: https://t.me/joinchat/NsX48w4OzcpkNTRi.

Code Snippets

6
7
8
9
shell:
    '''
        bcftools annotate --set-id "%CHROM:%POS:%REF:%FIRST_ALT" {input.vcf} -O z -o {output.vcf}
    '''
30
31
script:
    '../scripts/select_bad_samples.py'
50
51
script:
    '../scripts/plink_filter.py'
68
69
script:
    '../scripts/pre_imputation_check.py'
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
shell:
    """
    # remove dublicates
    cut -f 2 {params.input}.bim | sort | uniq -d > plink/{wildcards.batch}_snp.dups
    plink --bfile {params.input}          --exclude       plink/{wildcards.batch}_snp.dups                  --make-bed --out plink/{wildcards.batch}_merged_filter_dub   |& tee -a {log}
    plink --bfile {params.input}          --extract       plink/{wildcards.batch}_merged_filter.bim.freq    --make-bed --out plink/{wildcards.batch}_merged_freq         |& tee -a {log}
    plink --bfile plink/{wildcards.batch}_merged_freq       --extract       plink/{wildcards.batch}_merged_filter.bim.chr     --make-bed --out plink/{wildcards.batch}_merged_extracted    |& tee -a {log}
    plink --bfile plink/{wildcards.batch}_merged_extracted  --flip          plink/{wildcards.batch}_merged_filter.bim.flip    --make-bed --out plink/{wildcards.batch}_merged_flipped      |& tee -a {log}
    plink --bfile plink/{wildcards.batch}_merged_flipped    --update-chr    plink/{wildcards.batch}_merged_filter.bim.chr     --make-bed --out plink/{wildcards.batch}_merged_chroped      |& tee -a {log}
    plink --bfile plink/{wildcards.batch}_merged_chroped    --update-map    plink/{wildcards.batch}_merged_filter.bim.pos     --make-bed --out {params.out}              |& tee -a {log}
    rm plink/{wildcards.batch}_merged_filter_dub.*
    rm plink/{wildcards.batch}_merged_freq.*
    rm plink/{wildcards.batch}_merged_extracted.*
    rm plink/{wildcards.batch}_merged_flipped.*
    rm plink/{wildcards.batch}_merged_chroped.*
    """
128
129
130
131
132
133
134
135
136
137
138
139
shell:
    """
    plink --bfile {params.input} --a1-allele plink/{wildcards.batch}_merged_filter.bim.force_allele --make-bed --out plink/{wildcards.batch}_merged_mapped_alleled |& tee -a {log.plink}
    plink --bfile plink/{wildcards.batch}_merged_mapped_alleled --keep-allele-order --output-chr M --export vcf bgz --out vcf/{wildcards.batch}_merged_mapped_clean |& tee -a {log.vcf}
    mkdir vcf/temp_{wildcards.batch}
    bcftools sort -T vcf/temp_{wildcards.batch} vcf/{wildcards.batch}_merged_mapped_clean.vcf.gz -O z -o {output.temp_vcf} |& tee -a {log.vcf}
    bcftools index -f {output.temp_vcf} |& tee -a {log.vcf}
    # need to check output for the potential issues
    bcftools view {output.temp_vcf} --regions 1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22 -O b -o {output.bcf}
    bcftools norm --check-ref e -f {GRCH37_FASTA} vcf/{wildcards.batch}_merged_mapped_sorted.bcf.gz -O u -o /dev/null |& tee -a {log.vcf}
    bcftools index -f {output.bcf} | tee -a {log.vcf}
    """
15
16
17
18
19
20
21
22
23
shell:
    '''
        minimac4 \
        --refHaps {input.refHaps} \
        --haps phase/{wildcards.batch}_chr{wildcards.chrom}.phased.vcf.gz \
        --format GT,GP \
        --prefix imputed/{wildcards.batch}_chr{wildcards.chrom}.imputed \
        --minRatio 0.01 |& tee {log}
    '''
37
38
39
40
41
42
shell:
    '''
    FILTER="'strlen(REF)=1 & strlen(ALT)=1'"

    bcftools view -i 'strlen(REF)=1 & strlen(ALT)=1' imputed/{wildcards.batch}_chr{wildcards.chrom}.imputed.dose.vcf.gz -v snps -m 2 -M 2 -O z -o imputed/{wildcards.batch}_chr{wildcards.chrom}.imputed.dose.pass.vcf.gz |& tee {log}
    '''
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
shell:
    '''
    # for now just skip empty files
    true > {params.list} && \
    for i in {input}; do
        if [ -s $i ]
        then
            echo $i >> {params.list}
        else
            continue
        fi
    done

    bcftools concat -f {params.list} -O z -o {output} |& tee -a {log}
    bcftools index -f {output} |& tee -a {log}

    # check if there is a background data and merge it
    if [ -f "{params.merged_imputed}" && {params.mode} = "client" ]; then
        mv {output} {output}.client
        bcftools merge --force-samples {params.merged_imputed} {output}.client -O z -o {output} |& tee -a {log}
        bcftools index -f {output} |& tee -a {log}
    fi
    '''
111
112
113
114
shell:
    '''
    plink --vcf {input} --make-bed --out {params.out} |& tee {log}
    '''
134
135
136
137
138
139
shell:
    '''
    # please mind a merge step in merge_imputation_filter for germline
    plink --vcf {input} --make-bed --out {params.out} | tee {log}
    plink --bfile {params.background} --bmerge {params.out} --make-bed --out plink/{wildcards.batch}_merged_imputed |& tee {log}
    '''
10
11
12
13
shell:
    '''
        bcftools index -f -t {input.bcf} |& tee {log}
    '''
25
26
27
28
29
30
31
32
33
34
35
shell:
    '''
    eagle --vcfRef {input.vcfRef} \
    --vcfTarget {input.vcf}  \
    --geneticMapFile {GENETIC_MAP} \
    --chrom {wildcards.chrom} \
    --vcfOutFormat b \
    --pbwtIters 2 \
    --Kpbwt 20000 \
    --outPrefix phase/{wildcards.batch}_chr{wildcards.chrom}.phased |& tee {log}
    '''
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
shell:
    '''
    # for now just skip empty files
    true > {params.list} && \
    for i in {input}; do
        if [ -s $i ]
        then
            echo $i >> {params.list}
        else
            continue
        fi
    done

    bcftools concat -f {params.list} -O z -o {output} |& tee -a {log}
    bcftools index -f {output} |& tee -a {log}

    # check if there is a background data and merge it
    if [ -f "background/{wildcards.batch}_merged_imputed.vcf.gz" && {params.mode} = "client" ]; then
        mv {output} {output}.client
        bcftools merge --force-samples background/{wildcards.batch}_merged_imputed.vcf.gz {output}.client -O b -o {output} |& tee -a {log}
        bcftools index -f {output} |& tee -a {log}
    fi
    '''
17
18
19
20
21
22
23
24
25
26
27
28
29
shell:
    '''
    bcftools query --list-samples input.vcf.gz >> vcf/samples.txt
    total_lines=$(wc -l < vcf/samples.txt)
    num_files={params.num_batches}
    ((lines_per_file = (total_lines + num_files - 1) / num_files))
    split -l $lines_per_file vcf/samples.txt vcf/batch --additional-suffix=.txt --numeric-suffixes=1
    for file in $(find vcf -name 'batch0[1-9].txt')
    do
        new=$(echo "$file" | sed "s/0//g")
        mv "$file" "$new"
    done
    '''
38
39
40
41
shell:
    '''
    bcftools view -S {input.samples} {input.vcf} -O z -o {output.vcf} --force-samples
    '''
48
49
50
51
shell:
    '''
        ln -sr {input.vcf} {output.vcf}
    '''
61
script: '../scripts/remove_imputation.py'
68
69
70
71
shell:
    '''
        ln -sr {input.vcf} {output.vcf}
    '''
86
87
88
89
shell:
    '''
       JAVA_OPTS="-Xmx{params.mem_gb}g" picard -Xmx12g LiftoverVcf WARN_ON_MISSING_CONTIG=true MAX_RECORDS_IN_RAM=5000 I={input.vcf} O={output.vcf} CHAIN={LIFT_CHAIN} REJECT=vcf/chr{wildcards.batch}_rejected.vcf.gz R={GRCH37_FASTA} |& tee -a {log}
    '''
96
97
98
99
shell:
    '''
        ln -sr {input.vcf} {output.vcf}
    '''
107
108
109
110
111
112
shell:
    '''
        bcftools annotate --set-id "%CHROM:%POS:%REF:%FIRST_ALT" {input.vcf} | \
        bcftools view -t ^8:10428647-13469693,21:16344186-19375168,10:44555093-53240188,22:16051881-25095451,2:85304243-99558013,1:118434520-153401108,15:20060673-25145260,17:77186666-78417478,15:27115823-30295750,17:59518083-64970531,2:132695025-141442636,16:19393068-24031556,2:192352906-198110229 | \
        bcftools view --min-af 0.05 --types snps -O b -o {output.bcf}
    '''
125
126
127
128
shell:
    '''
        ln -sr {input.bcf} {output.bcf}
    '''
146
147
148
149
shell:
    '''
        bcftools view {input.bcf} -O z -o {output.vcf}
    '''
167
168
169
170
shell:
    '''
    plink --vcf {input} --make-bed --out {params.out} |& tee {log}
    '''
192
193
194
195
196
197
198
199
200
201
202
203
204
205
shell:
    '''
    rm files_list.txt || true
    for file in {input}
    do
        if [[ $file == *.fam ]]
        then
            echo ${{file%.*}} >> files_list.txt
        fi
    done

    plink --merge-list files_list.txt --make-bed --out preprocessed/data
    rm files_list.txt
    '''
213
214
215
216
shell:
    '''
    bcftools index -f {input.batches_vcf}
    '''
237
238
239
240
241
242
243
244
245
246
247
248
249
shell:
    '''
    rm complete_vcf_list.txt || true
    for FILE in {input}
    do
        if [[ $FILE == *.gz ]]
        then
            echo $FILE >> complete_vcf_list.txt
        fi
    done
    bcftools merge --threads {threads} --file-list complete_vcf_list.txt --force-samples -O z -o {output.vcf}
    rm complete_vcf_list.txt
    '''
265
266
267
268
shell:
    '''
    plink --vcf {input} --make-bed --out {params.out} |& tee {log}
    '''
282
283
284
285
shell:
    '''
    (add-map-plink.pl -cm {input.bim} {params.genetic_map_GRCh37}> {output}) |& tee -a {log}
    '''
292
293
294
295
shell:
    '''
        bcftools query --list-samples {input.bcf} >> {output.fam}
    '''
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
shell:
    """
    KING_DEGREE=3

    king -b {input.bed} --cpus {threads} --ibdseg --degree $KING_DEGREE --prefix {params.out} |& tee {log}
    king -b {input.bed} --cpus {threads} --kinship --degree 4 --prefix {params.kin} |& tee -a {log}

    # we need at least an empty file for the downstream analysis
    if [ ! -f "{output.king}" ]; then
        touch {output.king}
    fi
    if [ ! -f "{output.kinship}" ]; then
        touch {output.kinship}
    fi
    if [ ! -f "{output.kinship0}" ]; then
        touch {output.kinship0}
    fi
    if [ ! -f "{output.segments}" ]; then
        touch {output.segments}
    fi
    """
58
59
60
61
shell:
    """
    ibis {input.bed} {input.bim} {input.fam} -t {threads} -mt {params.mT} -mL {params.mL} -ibd2 -mL2 3 -hbd -f ibis/merged_ibis |& tee -a {log}
    """
77
78
79
80
81
82
83
shell:
    """
    python {input.script} \
        --input-ibd-segments-file {input.ibd} \
        --mask-file {params.mask} \
        --output-ibd-segments-file {output.ibd}
    """
97
98
script:
    "../scripts/transform_ibis_segments.py"
122
123
124
125
126
127
shell:
    """
    zcat {input.ibd} > {output.ibd}
    ersa --avuncular-adj -ci -a {params.a} --dmax 14 -t {params.t} -l {params.l} \
            {params.r} -th {params.th} {output.ibd} -o {output.relatives} |& tee {log}
    """
141
142
143
144
145
146
147
148
149
150
151
shell:
    """
    FILES="{input}"
    for input_file in $FILES; do
        if [[ "$input_file" == "${{FILES[0]}}" ]]; then
            cat $input_file >> {output}
        else
            sed 1d $input_file >> {output}
        fi
    done
    """
160
161
script:
    "../scripts/split_map.py"
175
script: "../scripts/merge_king_ersa.py"
18
19
20
21
shell:
    """
        plink --bfile {params.bfile} --chr {params.chr} --make-bed --out ibis_data/{params.chr} --threads {threads}
    """
35
36
37
38
shell:
    '''
    (add-map-plink.pl -cm {input.bim} {params.genetic_map_GRCh37}> {output}) |& tee -a {log}
    '''
56
57
58
59
shell:
    """
        ibis {input.bed} {input.bim} {input.fam} -t {threads} -mt {params.mT} -mL {params.mL} -ibd2 -mL2 3 -hbd -gzip -f ibis/merged_ibis_{params.chr} |& tee -a {log}
    """
67
68
69
70
shell:
    """
        zcat {input} >> {output.ibd}
    """
86
87
88
89
90
91
92
shell:
    """
    python {input.script} \
        --input-ibd-segments-file {input.ibd} \
        --mask-file {params.mask} \
        --output-ibd-segments-file {output.ibd}
    """
106
107
script:
    "../scripts/transform_ibis_segments.py"
131
132
133
134
135
136
shell:
    """
    zcat {input.ibd} > {output.ibd}
    ersa --avuncular-adj -ci -a {params.a} --dmax 14 -t {params.t} -l {params.l} \
            {params.r} -th {params.th} {output.ibd} -o {output.relatives} |& tee {log}
    """
150
151
152
153
154
155
156
157
158
159
160
shell:
    """
    FILES="{input}"
    for input_file in $FILES; do
        if [[ "$input_file" == "${{FILES[0]}}" ]]; then
            cat $input_file >> {output}
        else
            sed 1d $input_file >> {output}
        fi
    done
    """
171
script: "../scripts/postprocess_ersa.py"
16
17
18
19
shell:
    """
    bcftools index {input} | tee {log}
    """
34
35
36
37
shell:
    """
    bcftools filter {input} -r {wildcards.chrom} -O z -o vcf/for_rapid_{wildcards.chrom}.vcf.gz | tee {log}
    """
54
55
script:
    "../scripts/estimate_rapid_params.py"
67
68
script:
    "../scripts/interpolate.py"
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
    script:
        "../scripts/interpolate.py"

'''
rule erase_dosages:
    input:
        vcf=rules.index_and_split.output[0]
    output:
        vcf='vcf/erased_chr{chrom}.vcf.gz'
    params:
        vcf='vcf/erased_chr{chrom}'
    conda:
        'bcftools'
    shell:
        "bcftools annotate -x 'fmt' {input.vcf} -O z -o {output.vcf}"
'''
 98
 99
100
101
102
shell:
    """
        rapidibd -i {input.vcf} -g {input.g} -d {params.min_cm_len} -o {params.output_folder}/chr{wildcards.chrom} \
        -w {params.window_size} -r {params.num_runs} -s {params.num_success}
    """
110
111
112
113
114
115
116
shell:
    """
        for file in {input}
        do
            zcat ${{file}} >> {output}
        done
    """
128
129
script:
    "../scripts/transform_rapid_segments.py"
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
shell:
    """
    touch {output}
    FILES="{input.ibd}"
    TEMPFILE=ersa/temp_relatives.tsv
    rm -f $TEMPFILE
    rm -f {output}

    for input_file in $FILES; do
        ersa --avuncular-adj -ci -a {params.a} --dmax 14 -t {params.t} -l {params.l} \
            {params.r} -th {params.th} $input_file -o $TEMPFILE |& tee {log}

        if [[ "$input_file" == "${{FILES[0]}}" ]]; then
            cat $TEMPFILE >> {output}
        else
            sed 1d $TEMPFILE >> {output}
        fi
    done
    """
185
script: "../scripts/postprocess_ersa.py"
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
shell:
    """
    KING_DEGREE=3

    king -b {params.input}.bed --cpus {threads} --ibdseg --degree $KING_DEGREE --prefix {params.out} |& tee {log}
    king -b {params.input}.bed --cpus {threads} --kinship --degree $KING_DEGREE --prefix {params.kin} |& tee -a {log}

    # we need at least an empty file for the downstream analysis
    if [ ! -f "{output.king}" ]; then
        touch {output.king}
    fi
    if [ ! -f "{output.kinship}" ]; then
        touch {output.kinship}
    fi
    if [ ! -f "{output.kinship0}" ]; then
        touch {output.kinship0}
    fi
    if [ ! -f "{output.segments}" ]; then
        touch {output.segments}
    fi
    """
45
shell: "bcftools index -f {input}"
59
60
61
62
shell:
    """
    bcftools filter -r {wildcards.chrom} {input.vcf} | bcftools norm --rm-dup none -O z -o vcf/imputed_chr{wildcards.chrom}.vcf.gz |& tee {log}
    """
80
81
script:
    "../scripts/vcf_to_ped.py"
95
96
97
98
shell:
    """
    plink --file ped/imputed_chr{wildcards.chrom} --keep-allele-order --cm-map {input.cmmap} {wildcards.chrom} --recode --out cm/chr{wildcards.chrom}.cm |& tee {log}
    """
107
108
109
110
111
112
113
114
115
shell:
    """
    touch {output}
    germline -input ped/imputed_chr{wildcards.chrom}.ped cm/chr{wildcards.chrom}.cm.map -min_m 2.5 -err_hom 2 -err_het 1 -output germline/chr{wildcards.chrom}.germline |& tee {log}
    # TODO: germline returns some length in BP instead of cM - clean up is needed
    set +e
    grep -v MB germline/chr{wildcards.chrom}.germline.match > germline/chr{wildcards.chrom}.germline.match.clean && mv germline/chr{wildcards.chrom}.germline.match.clean germline/chr{wildcards.chrom}.germline.match
    set -e
    """
122
123
shell:
     "cat germline/*.match > {output}"
136
137
script:
    '../scripts/merge_ibd.py'
155
156
157
158
159
shell:
    """
    touch {output}
    ersa --avuncular-adj -ci --alpha {params.alpha} --dmax 14 -t {params.ersa_t} -l {params.ersa_l} -th {params.ersa_th} {input.ibd} -o {output} |& tee {log}
    """
175
script: "../scripts/merge_king_ersa.py"
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import scipy
import scipy.optimize
import random
import math
import gzip
import logging
from collections import namedtuple
import statsmodels.api as sm
lowess = sm.nonparametric.lowess
import re
import operator as op
from functools import reduce


import numpy as np

w_rho_vals = []

def compute_mafs(vcf_input,max_window_size):
        global w_rho_vals
       # w_rho_vals.append(1)
        f = gzip.open(vcf_input,'rt')
        entries_started = False

        expected_vals = []

        site_counter = 0
        mafs = []
        for line in f:
                if ('#CHROM' in line):
                        entries_started = True
                elif (entries_started):
                        _values = line.split()
                        _pos = _values[1]
                        alt = _values[4]
                        if (len(alt.split(',')) > 1):
                           continue
                        i = 2
                        while(i < len(_values) and _values[i] != 'GT'):
                           i += 1
                        i += 1
                        tags = _values[7]
                        i = 9
                        num_ones = 0
                        num_zeros = 0
                        while (i < len(_values)):
                                        vals = re.split('\||/',_values[i])
                                        if (vals[0] == '1'):
                                                num_ones = num_ones + 1
                                        elif (vals[0] == '0'):
                                                num_zeros = num_zeros + 1
                                        if (vals[1] == '1'):
                                                num_ones = num_ones + 1
                                        elif (vals[1] == '0'):
                                                num_zeros = num_zeros + 1
                                        i = i + 1
                        v =  min(num_zeros,num_ones)/float(num_zeros+num_ones)
                        mafs.append(v)
        f.close()
        x_vals = [0]
        y_vals = [0]
        for o in range(1,max_window_size):
                window_size = o
                expected_vals = []
                for i in range(0,len(mafs),window_size):
                        expected_maf = 0
                        _sum = 0.0
                        for j in range(0,window_size):
                                if (i + j  >= len(mafs)):
                                        continue
                                _sum += mafs[i+j]
                        for j in range(0,window_size):
                                if (i + j >= len(mafs)):
                                        continue
                                if (_sum != 0):
                                        expected_maf += (mafs[i+j] * mafs[i+j]/_sum)
                        expected_vals.append(expected_maf)

                import numpy
                _a = numpy.array(expected_vals)

                pa = numpy.percentile(_a,1)
                rho_v = pa*pa + (1-pa)*(1-pa)
                x_vals.append(o)
                y_vals.append(rho_v)

        w_rho_vals = lowess(y_vals, x_vals,frac=0.1,return_sorted=False)
        #for k in range(0,len(x_vals)):
        #        print (x_vals[k],w_rho_vals[k])
        #       for j in w_rho_vals:
        #       print l,rho_v

def ncr_old(n, r):
    r = min(r, n-r)
    numer = reduce(op.mul, range(n, n-r, -1), 1)
    denom = reduce(op.mul, range(1, r+1), 1)
    return numer / denom


def w_rho_func(_w):

    if (_w >= len(w_rho_vals)):
            return w_rho_vals[len(w_rho_vals)-1]
    else:
     #   w = int(round(_w,0))
        return w_rho_vals[int(_w)]


def ncr(n,r):
    f = math.factorial
    return f(n) / f(r) / f(n-r)

def fp(e,N,L,rho,r,c,w):
    sum = 0.0
    #rho = w_rho_func(w)
    for i in range(0,c):
        sum += (ncr(r,i)* ((rho**(L/w))**i) * ( (1-rho**(L/w))**(r-i) ))
    return 1 - sum

def tp(er,N,L,rho,r,c,w):
    sum = 0.0
    for i in range(0,c):
        sum += (ncr(r,i) * (math.exp(-(er*L)/w)**i) * ((1-math.exp(-(L*er)/w))**(r-i)) )
    return 1 - sum


def compute_w(error_rate,N,L,rho,r,c,max_w=300):
	fun = lambda w: 0.5* N*(N-1)*fp(error_rate,N,L,w_rho_func(w),r,c,w) / tp(error_rate,N,L,w_rho_func(int(w)),r,c,w)

    #bnds = ((0, None))
    #x0 = 40
	lambda_val = 0.5* N*(N-1)
	w_min = -1
	_started = False
	for w in range(1,max_w):
		_tp = tp(error_rate,N,L,w_rho_func(int(w)),r,c,w)
		_fp = fp(error_rate,N,L,w_rho_func(w),r,c,w)
		if (round(_tp,2) -  0.5* N*(N-1)* round(_fp,2)) == 1:
			if (_started):
				continue
			else:
				w_min = w
				_started = True
		elif _started:
			print (w_min, w)
			return w_min, w
	return w_min, max_w


	#cons = ({'type': 'ineq', 'fun': lambda w:  0.5*N*(N-1)*fp(error_rate,N,L,w_rho_func(w),r,c,w) - tp(error_rate,N,L,w_rho_func(int(w)),r,c,w)})
    #res = scipy.optimize.minimize(fun,[x0], method='COBYLA', tol=1e-1,bounds=bnds)#,constraints=cons)


    #print (res)


if __name__ == '__main__':

    try:
        snakemake
    except NameError:
        test_dir = 'test_data/rapid'
        Snakemake = namedtuple('Snakemake', ['input', 'output', 'params', 'log'])
        snakemake = Snakemake(
            input={'vcf': f'test_data/rapid/phased/chr1.phased.vcf.gz',
                   'num_haplotypes': f'{2504*2}'},

            output=[f'{test_dir}/rapid_params'],
            params={'error_rate': '0.0025',
                    'min_snps': '600',
                    'num_runs': '10',
                    'num_success': '2'},
            log=[f'{test_dir}/estimate_rapid_params.log']
        )

    logging.basicConfig(filename=snakemake.log[0], level=logging.DEBUG, format='%(levelname)s:%(asctime)s %(message)s')

    vcf_input = snakemake.input['vcf']
    error_rate = float(snakemake.params['error_rate'])
    num_haplotypes = int(snakemake.input['num_haplotypes'])
    min_snps = int(snakemake.params['min_snps'])

    num_runs = int(snakemake.params['num_runs'])
    num_success = int(snakemake.params['num_success'])

    max_window_size = 300
    #error_rate = 0.0025
    #min_length_SNPs = 12000#14000
    rho_initial = 0.9
    compute_mafs(vcf_input, max_window_size)
    print(f'MAFS were computed successfully')
    w_min, w_max = compute_w(error_rate, num_haplotypes, min_snps, rho_initial, num_runs, num_success, max_window_size)
    print(f'w_min is {w_min}, w_max is {w_max}')
    with open(snakemake.output[0], 'w') as out:
        out.write(f'W={w_min}')
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import pandas
import numpy
from utils.bcftools import bcftools_query


def interpolate(vcf_path, cm_path, output_path, chrom):

    genetic_map = pandas.read_csv(cm_path, sep=" ")

    pos = list(map(int, bcftools_query(vcf_path, arg="%POS\n", list_samples=False)))
    # interpolate CM from BP
    #chro = genetic_map[genetic_map.chr == int(chrom)]
    # because for chr19 'Genetic_Map(cM)' is 'Genetic_Map.cM.'
    cms = numpy.interp(pos, genetic_map.position.values, genetic_map.iloc[:, -1].values)

    eps = 1e-7
    new_cms = []
    for i in range(0, cms.shape[0]):
        if i == 0:
            new_cms.append(cms[i])
            continue

        if new_cms[-1] >= cms[i]:
            new_cms.append(new_cms[-1] + eps)
        else:
            new_cms.append(cms[i])

    with open(output_path, 'w') as w_map:
        for i, cm in enumerate(new_cms):
            if i > 0 and cm <= new_cms[i-1]:
                raise ValueError(f'cm positions {i-1} and {i} are NOT strictly increasing: {new_cms[:i+1]}')
            w_map.write(f"{i}\t{cm:.8f}\n")


if __name__ == '__main__':
    vcf_path = snakemake.input['vcf']
    cm_path = snakemake.input['cmmap']

    chrom = snakemake.wildcards['chrom']

    output_path = snakemake.output[0]
    interpolate(vcf_path, cm_path, output_path, chrom)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from utils.ibd import merge_germline_segments, segments_to_germline, interpolate_all, read_germline_segments, read_pedsim_segments


if __name__ == '__main__':

    germline_path = snakemake.input['germline']
    map_dir = snakemake.params['cm_dir']
    gap = float(snakemake.params['merge_gap'])
    pedsim_path = snakemake.params['true_ibd']

    use_true_ibd = bool(snakemake.params['use_true_ibd'])
    if not use_true_ibd:
        segments = read_germline_segments(germline_path)
        segments = interpolate_all(segments, map_dir)
        segments = merge_germline_segments(segments, gap=gap)
        segments_to_germline(segments, snakemake.output['ibd'])
    else:
        true_segments = read_pedsim_segments(pedsim_path)
        segments_to_germline(true_segments, snakemake.output['ibd'])
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
import pandas
import numpy
import os
import logging
from collections import Counter, namedtuple
from utils.ibd import read_king_segments as rks, interpolate_all, Segment


def is_non_zero_file(fpath):
    return os.path.isfile(fpath) and os.path.getsize(fpath) > 0


def _sort_ids(data):
    unsorted_mask = data.id2 < data.id1
    data.loc[unsorted_mask, 'id1'], data.loc[unsorted_mask, 'id2'] = data.loc[unsorted_mask, 'id2'], data.loc[
        unsorted_mask, 'id1']


def read_bucket_dir(bucket_dir):

    total = None
    for file in os.listdir(bucket_dir):
        if not file.endswith('tsv'):
            continue
        path = os.path.join(bucket_dir, file)
        bucket = read_germline(path)
        if total is None:
            total = bucket
        else:
            total = total.append(bucket)
    return total


def read_germline(ibd_path):
    germline_names = [
        'fid_iid1',
        'fid_iid2',
        'chrom',
        'start_end_bp',
        'start_end_snp',
        'snp_len',
        'genetic_len',
        'len_units',
        'mismatches',
        'is_homozygous1',
        'is_homozygous2'
    ]

    data = pandas.read_table(ibd_path, header=None, names=germline_names)
    if data.shape[0] == 0:
        return pandas.DataFrame(columns=['id1', 'id2', 'seg_count_germline', 'total_seg_len_germline']).set_index(['id1', 'id2'])

    data.loc[:, 'id1'] = data.fid_iid1.str.split().str[1]
    data.loc[:, 'id2'] = data.fid_iid2.str.split().str[1]
    _sort_ids(data)
    segments_info = data.loc[:, ['id1', 'id2', 'chrom', 'genetic_len']].groupby(by=['id1', 'id2']).agg({'genetic_len': 'sum', 'chrom': 'count'})
    segments_info.rename({'genetic_len': 'total_seg_len_germline', 'chrom': 'seg_count_germline'}, axis=1, inplace=True)
    return segments_info


def map_king_degree(king_degree):
    degree_map = {
        'Dup/MZ': 0,
        'PO': 1,
        'FS': 2,
        '2nd': 2,
        '3rd': 3,
        '4th': 4,
        'NA': numpy.nan
    }
    return [degree_map[kd] for kd in king_degree]


def map_king_relation(king_degree):
    degree_map = {
        'Dup/MZ': 0,
        'PO': 'PO',
        'FS': 'FS',
        '2nd': 2,
        '3rd': 3,
        '4th': 4,
        'NA': numpy.nan
    }
    return [degree_map[kd] for kd in king_degree]


def read_king(king_path):
    # FID1    ID1     FID2    ID2     MaxIBD1 MaxIBD2 IBD1Seg IBD2Seg PropIBD InfType
    try:
        data = pandas.read_table(king_path)
        data.loc[:, 'id1'] = data.FID1.astype(str) + '_' + data.ID1.astype(str)
        data.loc[:, 'id2'] = data.FID2.astype(str) + '_' + data.ID2.astype(str)
        _sort_ids(data)

        data.loc[:, 'king_degree'] = map_king_degree(data.InfType)
        data.loc[:, 'king_relation'] = map_king_relation(data.InfType)
        data.loc[:, 'king_degree'] = data.king_degree.astype(float).astype(pandas.Int32Dtype())
        data.rename({'PropIBD': 'shared_genome_proportion', 'IBD1Seg': 'king_ibd1_prop', 'IBD2Seg': 'king_ibd2_prop'}, axis='columns', inplace=True)
        indexed = data.loc[:, ['id1', 'id2', 'king_degree', 'king_relation', 'king_ibd1_prop', 'king_ibd2_prop', 'shared_genome_proportion']].\
            set_index(['id1', 'id2'])
        return indexed

    except pandas.errors.EmptyDataError:
        return pandas.DataFrame(data=None, index=None,
                                columns=['id1',
                                         'id2',
                                         'king_degree',
                                         'king_relation',
                                         'king_ibd1_prop',
                                         'king_ibd2_prop',
                                         'shared_genome_proportion']).set_index(['id1', 'id2'])


def read_king_segments_chunked(king_segments_path, map_dir):
    total = None
    try:
        for i, chunk in enumerate(pandas.read_table(
                                                    king_segments_path, 
                                                    compression='gzip', 
                                                    dtype={'FID1': str, 'ID1': str, 'FID2': str, 'ID2': str}, 
                                                    chunksize=1e+6)):

            segments = {}
            for i, row in chunk.iterrows():
                id1 = row['FID1'] + '_' + row['ID1']
                id2 = row['FID2'] + '_' + row['ID2']
                seg = Segment(id1, id2, row['Chr'],
                            bp_start=row['StartMB']*1e+6, bp_end=row['StopMB']*1e+6)
                key = tuple(sorted((seg.id1, seg.id2)))
                if key not in segments:
                    segments[key] = [seg]
                else:
                    segments[key].append(seg)

            segments = interpolate_all(segments, map_dir)
            data = pandas.DataFrame(columns=['id1', 'id2', 'total_seg_len_king', 'seg_count_king'])
            logging.info(f'loaded and interpolated segments from chunk {i} for {len(segments)} pairs')
            rows = []
            for key, segs in segments.items():
                row = {'id1': key[0],
                    'id2': key[1],
                    'total_seg_len_king': sum([s.cm_len for s in segs]),
                    'seg_count_king': len(segs)}
                rows.append(row)

            rows_frame = pandas.DataFrame.from_records(rows, columns=['id1', 'id2', 'total_seg_len_king', 'seg_count_king'])
            data = pandas.concat([data, rows_frame], ignore_index=True)

            _sort_ids(data)
            data = data.set_index(['id1', 'id2'])
            total = total.append(data) if total is not None else data

        return total

    except pandas.errors.EmptyDataError:
        return pandas.DataFrame(columns=['id1', 'id2', 'total_seg_len_king', 'seg_count_king']).set_index(['id1', 'id2'])


def read_king_segments(king_segments_path, map_dir):
    try:
        segments = rks(king_segments_path)
        segments = interpolate_all(segments, map_dir)
        data = pandas.DataFrame(columns=['id1', 'id2', 'total_seg_len_king', 'seg_count_king'])
        logging.info(f'loaded and interpolated segments for {len(segments)} pairs')
        for key, segs in segments.items():
            row = {'id1': key[0],
                   'id2': key[1],
                   'total_seg_len_king': sum([s.cm_len for s in segs]),
                   'seg_count_king': len(segs)}
            data = data.append(row, ignore_index=True)

        _sort_ids(data)
        return data.set_index(['id1', 'id2'])

    except pandas.errors.EmptyDataError:
        return pandas.DataFrame(columns=['id1', 'id2', 'total_seg_len_king', 'seg_count_king']).set_index(['id1', 'id2'])


def kinship_to_degree(kinship):
    degrees = []
    # intervals are from KING manual
    # >0.354, [0.177, 0.354], [0.0884, 0.177] and [0.0442, 0.0884]
    for k in kinship:
        if k > 0.354:
            degrees.append(0)
        elif 0.177 < k <= 0.354:
            degrees.append(1)
        elif 0.0884 < k <= 0.177:
            degrees.append(2)
        elif 0.0442 < k <= 0.0884:
            degrees.append(3)
        else:
            degrees.append(None)
    return degrees


def _read_kinship_data(kinship_path):
    try:
        data = pandas.read_table(kinship_path)
        print(kinship_path, data.shape)
        # If no relations were found, king creates file with only header
        if data.shape[0] != 0:
            if 'FID' in data.columns:
                data.loc[:, 'id1'] = data.FID.astype(str) + '_' + data.ID1.astype(str)
                data.loc[:, 'id2'] = data.FID.astype(str) + '_' + data.ID2.astype(str)
            else:
                data.loc[:, 'id1'] = data.FID1.astype(str) + '_' + data.ID1.astype(str)
                data.loc[:, 'id2'] = data.FID2.astype(str) + '_' + data.ID2.astype(str)

            _sort_ids(data)
            data.rename({'Kinship': 'kinship'}, axis=1, inplace=True)
            data.loc[:, 'kinship_degree'] = kinship_to_degree(data.kinship)
            data = data.loc[:, ['id1', 'id2', 'kinship', 'kinship_degree']].set_index(['id1', 'id2'])
        else:
            data = pandas.DataFrame(columns=['id1', 'id2', 'kinship', 'kinship_degree']).set_index(['id1', 'id2'])
        return data
    except pandas.errors.EmptyDataError:
        return pandas.DataFrame(columns=['id1', 'id2', 'kinship', 'kinship_degree']).set_index(['id1', 'id2'])


def read_kinship(kinship_path, kinship0_path):
    # parse within families
    # FID     ID1     ID2     N_SNP   Z0      Phi     HetHet  IBS0    Kinship Error
    within = _read_kinship_data(kinship_path)
    logging.info(f'loaded {within.shape[0]} pairs from within-families kinship estimation results')

    # FID1    ID1     FID2    ID2     N_SNP   HetHet  IBS0    Kinship
    across = _read_kinship_data(kinship0_path)
    logging.info(f'loaded {across.shape[0]} pairs from across-families kinship estimation results')
    return pandas.concat([within, across], axis=0)


def read_ersa(ersa_path):
    # Indv_1     Indv_2      Rel_est1      Rel_est2      d_est     lower_d  upper_d     N_seg     Tot_cM
    data = pandas.read_table(ersa_path, header=0,
                             names=['id1', 'id2', 'rel_est1', 'rel_est2',
                                    'ersa_degree', 'ersa_lower_bound', 'ersa_upper_bound', 'seg_count_ersa', 'total_seg_len_ersa'],
                             dtype={'ersa_degree': str, 'seg_count_ersa': str, 'total_seg_len_ersa': str})

    data = data.loc[(data.rel_est1 != 'NA') | (data.rel_est2 != 'NA'), :]
    data.loc[:, 'id1'] = data.id1.str.strip()
    data.loc[:, 'id2'] = data.id2.str.strip()
    _sort_ids(data)
    data.loc[:, 'ersa_degree'] = pandas.to_numeric(data.ersa_degree.str.strip(), errors='coerce').astype(
        pandas.Int32Dtype())
    data.loc[:, 'ersa_lower_bound'] = pandas.to_numeric(data.ersa_lower_bound.str.strip(), errors='coerce').astype(
        pandas.Int32Dtype())
    data.loc[:, 'ersa_upper_bound'] = pandas.to_numeric(data.ersa_upper_bound.str.strip(), errors='coerce').astype(
        pandas.Int32Dtype())
    print('dtypes are: ', data.dtypes)
    data.loc[:, 'seg_count_ersa'] = pandas.to_numeric(data.seg_count_ersa.str.strip(), errors='coerce').astype(
        pandas.Int32Dtype())
    data.loc[:, 'total_seg_len_ersa'] = pandas.to_numeric(data.total_seg_len_ersa.str.strip(), errors='coerce')

    data.loc[data.ersa_lower_bound != 1, 'ersa_lower_bound'] = data.ersa_lower_bound - 1
    data.loc[data.ersa_upper_bound != 1, 'ersa_upper_bound'] = data.ersa_upper_bound - 1

    logging.info(f'read {data.shape[0]} pairs from ersa output')
    logging.info(f'ersa after reading has {pandas.notna(data.ersa_degree).sum()}')
    data.loc[:, 'is_niece_aunt'] = [True if 'Niece' in d else False for d in data.rel_est1]
    logging.info(f'we have total of {data.is_niece_aunt.sum()} possible Niece/Aunt relationships')

    return data.loc[data.id1 != data.id2, ['id1', 'id2', 'ersa_degree', 'ersa_lower_bound', 'ersa_upper_bound', 'is_niece_aunt', 'seg_count_ersa', 'total_seg_len_ersa']].\
        set_index(['id1', 'id2'])


def read_ersa2(ersa_path):
    # individual_1    individual_2    est_number_of_shared_ancestors  est_degree_of_relatedness       0.95 CI_2p_lower:4
    # 2p_upper:5     1p_lower:6 1p_upper:7        0p_lower:8        0p_upper:9
    # maxlnl_relatedness      maxlnl_unrelatednes
    data = pandas.read_table(ersa_path, comment='#')

    data = data.loc[data.est_degree_of_relatedness != 'no_sig_rel', :]
    data.rename({'individual_1': 'id1', 'individual_2': 'id2', 'est_number_of_shared_ancestors': 'shared_ancestors'}, axis=1, inplace=True)
    data.loc[:, 'ersa_degree'] = pandas.to_numeric(data['est_degree_of_relatedness'], errors='coerce').\
        astype(pandas.Int32Dtype())

    lower, upper = [], []
    for i, ancestors in enumerate(data.shared_ancestors):
        if ancestors == 0:
            lower.append(data.iloc[i, 8])
            upper.append(data.iloc[i, 9])
        if ancestors == 1:
            lower.append(data.iloc[i, 6])
            upper.append(data.iloc[i, 7])
        if ancestors == 2:
            lower.append(data.iloc[i, 4])
            upper.append(data.iloc[i, 5])

    data.loc[:, 'ersa_lower_bound'] = lower
    data.loc[:, 'ersa_upper_bound'] = upper

    cols = ['id1', 'id2', 'ersa_degree', 'ersa_lower_bound', 'ersa_upper_bound', 'shared_ancestors']
    return data.loc[data.id1 != data.id2, cols].set_index(['id1', 'id2'])


if __name__ == '__main__':

    try:
        snakemake
    except NameError:
        test_dir = '/media_ssd/pipeline_data/TF-CEU-TRIBES-ibis-king-2'
        Snakemake = namedtuple('Snakemake', ['input', 'output', 'params', 'log'])
        snakemake = Snakemake(
            input={'king': f'{test_dir}/king/data.seg',
                   'king_segments': f'{test_dir}/king/data.segments.gz',
                   'kinship': f'{test_dir}/king/data.kin',
                   'kinship0': f'{test_dir}/king/data.kin0',
                   'ersa': f'{test_dir}/ersa/relatives.tsv'},

            output=[f'test_data/merge_king_ersa.tsv'],
            params={'cm_dir': f'{test_dir}/cm'},
            log=['test_data/merge_king_ersa.log']
        )

    logging.basicConfig(filename=snakemake.log[0], level=logging.DEBUG, format='%(levelname)s:%(asctime)s %(message)s')

    king_path = snakemake.input['king']
    king_segments_path = snakemake.input['king_segments']
    # within families
    kinship_path = snakemake.input['kinship']
    # across families
    kinship0_path = snakemake.input['kinship0']
    ersa_path = snakemake.input['ersa']
    map_dir = snakemake.params['cm_dir']
    output_path = snakemake.output[0]

    king = read_king(king_path)
    kinship = read_kinship(kinship_path, kinship0_path)
    king_segments = read_king_segments_chunked(king_segments_path, map_dir)
    ersa = read_ersa(ersa_path)

    relatives = king.merge(kinship, how='outer', left_index=True, right_index=True).\
                     merge(ersa, how='outer', left_index=True, right_index=True).\
                     merge(king_segments, how='outer', left_index=True, right_index=True)

    prefer_ersa_mask = pandas.isnull(relatives.king_degree) | (relatives.king_degree > 3)
    relatives.loc[:, 'final_degree'] = relatives.king_degree
    # if king is unsure or king degree > 3 then we use ersa distant relatives estimation
    relatives.loc[prefer_ersa_mask, 'final_degree'] = relatives.ersa_degree
    logging.info(f'king is null or more than 3: {prefer_ersa_mask.sum()}')
    logging.info(f'ersa is not null: {pandas.notna(relatives.ersa_degree).sum()}')
    print(f'relatives columns are {relatives.columns}')
    if 'total_seg_len_king' in relatives.columns:
        relatives.loc[:, 'total_seg_len'] = relatives.total_seg_len_king*relatives.king_ibd1_prop
        relatives.loc[:, 'total_seg_len_ibd2'] = relatives.total_seg_len_king*relatives.king_ibd2_prop
        relatives.loc[:, 'seg_count'] = relatives.seg_count_king

    relatives.loc[prefer_ersa_mask, 'total_seg_len'] = relatives.total_seg_len_ersa
    relatives.loc[prefer_ersa_mask, 'seg_count'] = relatives.seg_count_ersa
    print('is na: ', pandas.isna(relatives.loc[prefer_ersa_mask, 'total_seg_len_ersa']).sum())
    # approximate calculations, IBD share is really small in this case
    relatives.loc[prefer_ersa_mask, 'shared_genome_proportion'] = 0.5*relatives.loc[prefer_ersa_mask, 'total_seg_len'].values / 3440
    relatives.drop(['total_seg_len_king', 'seg_count_king', 'total_seg_len_ersa', 'seg_count_ersa', 'king_ibd1_prop', 'king_ibd2_prop'],
                   axis='columns', inplace=True)

    logging.info(f'final degree not null: {pandas.notna(relatives.final_degree).sum()}')

    relatives.loc[pandas.notna(relatives.final_degree), :].to_csv(output_path, sep='\t')
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import subprocess
import logging

if __name__ == '__main__':

    vcf = snakemake.input['vcf']
    bad_samples = snakemake.input['bad_samples']
    out = snakemake.params['out']
    batch = snakemake.params['batch']
    log = snakemake.log[0]

    logging.basicConfig(filename=log,
                        level=logging.DEBUG,
                        format='%(levelname)s:%(asctime)s %(message)s')

    p_freqx = subprocess.run(f'plink --vcf {vcf} --freqx --out plink/{out}',
                             shell=True,
                             capture_output=True)

    p_remove = subprocess.run(f'plink --vcf {vcf} --remove {bad_samples} '
                              f'--make-bed --keep-allele-order --out plink/{out}',
                              shell=True,
                              capture_output=True)

    stdout_freqx = p_freqx.stdout.decode()
    stderr_freqx = p_freqx.stderr.decode()
    stderr_remove = p_remove.stderr.decode()
    stdout_remove = p_remove.stdout.decode()
    empty_batch_err_code = 11

    logging.info(f'\nSTDOUT:\n{stdout_remove}\n{stdout_freqx}'
                 f'\nSTDERR:\n{stderr_remove}\n{stderr_freqx}')

    if p_remove.returncode != 0 or p_freqx.returncode != 0:
        if p_remove.returncode == empty_batch_err_code:
            with open('pass_batches.list', 'r') as list:
                lines = list.readlines()
            with open('pass_batches.list', 'w') as list:
                for line in lines:
                    if line.strip('\n') != f'{batch}':
                        list.write(line)
                    else:
                        print(f'Removed {line}!')
        else:
            raise Error(f"Rule plink_filter for batch{batch} failed with error! See {log} for details.")
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
import polars as pl
import logging


def _sort_ids(data):
    return data.with_columns(
        pl.when(pl.col('id2') < pl.col('id1'))
        .then(
            pl.struct([
                pl.col('id2').alias('id1'),
                pl.col('id1').alias('id2')
            ])
        )
        .otherwise(
            pl.struct([
                pl.col('id1').alias('id1'),
                pl.col('id2').alias('id2')
            ])
        )
    ).drop('id2').unnest('id1')


def read_ibis(ibd_path):
    # sample1 sample2 chrom phys_start_pos phys_end_pos IBD_type
    # genetic_start_pos genetic_end_pos genetic_seg_length marker_count error_count error_density

    def get_new_col_names(old_names):
        return ['id1', 'id2', 'chrom', 'phys_start_pos',
                'phys_end_pos', 'IBD_type', 'genetic_start_pos',
                'genetic_end_pos', 'genetic_seg_length', 'marker_count',
                'error_count', 'error_density']

    data = pl.scan_csv(ibd_path, has_header=False, with_column_names=get_new_col_names, separator='\t', null_values="NA").lazy()
    data = data.with_columns([
        pl.col('id1').str.replace(':', '_'),
        pl.col('id2').str.replace(':', '_')
    ])
    data = _sort_ids(data)
    ibd2_info = (
        data
        .filter((pl.col('IBD_type') == 'IBD2'))
        .select(['id1', 'id2', 'chrom', 'genetic_seg_length'])
        .groupby(['id1', 'id2'])
        .agg([
            pl.col('chrom').count(),
            pl.col('genetic_seg_length').sum()
            ])
        )
    ibd2_info = ibd2_info.rename({'genetic_seg_length': 'total_seg_len_ibd2', 'chrom': 'seg_count_ibd2'})
    return ibd2_info


def read_ersa(ersa_path):
    # Indv_1     Indv_2      Rel_est1      Rel_est2      d_est     lower_d  upper_d     N_seg     Tot_cM

    def get_new_col_names(old_names):
        return ['id1', 'id2', 'rel_est1', 'rel_est2',
               'ersa_degree', 'ersa_lower_bound', 'ersa_upper_bound',
                'seg_count', 'total_seg_len']

    data = pl.scan_csv(ersa_path, has_header=True, separator='\t', null_values="NA",
                             with_column_names=get_new_col_names,
                             dtypes={'ersa_degree': str, 'ersa_lower_bound': str, 'ersa_upper_bound': str,
                                    'seg_count': str, 'total_seg_len': str})

    data = data.filter((pl.col('rel_est1').is_null().is_not()) | (pl.col('rel_est2').is_null().is_not()))
    data = data.with_columns([
        pl.col('id1').str.strip(),
        pl.col('id2').str.strip()
    ])
    data = _sort_ids(data)
    print(list(zip(data.columns, data.dtypes)))
    data = data.with_columns([
        pl.col('ersa_degree').str.strip().cast(pl.Int32, strict=False),
        pl.col('ersa_lower_bound').str.strip().cast(pl.Int32, strict=False),
        pl.col('ersa_upper_bound').str.strip().cast(pl.Int32, strict=False),
        pl.col('seg_count').str.strip().cast(pl.Int32, strict=False),
        pl.col('total_seg_len').str.replace(',', '').str.strip().cast(pl.Float64, strict=False)
    ])
    found = data.select([pl.col('total_seg_len').where(pl.col('total_seg_len') > 1000).sum()])
    # print(f'parsed total_seg_len, found {found} long entries')
    # print(f'{data.filter(pl.col("ersa_lower_bound").is_null()).collect(streaming=True)}')
    # print(data.with_columns(pl.col('ersa_lower_bound') - 1).collect(streaming=True).head(5))
    data = data.with_columns([
        pl.when(pl.col('ersa_lower_bound') != 1)
        .then(pl.col('ersa_lower_bound') - 1).otherwise(pl.col('ersa_lower_bound')),
        pl.when(pl.col('ersa_upper_bound') != 1)
        .then(pl.col('ersa_upper_bound') - 1).otherwise(pl.col('ersa_upper_bound'))
    ])
    print(data)
    print(data.collect(streaming=True).head(5))
    #logging.info(f'read {data.shape[0]} pairs from ersa output')
    #logging.info(f'ersa after reading has {len(data) - data["ersa_degree"].null_count()}')
    data = data.select(
        ['id1', 'id2', 'ersa_degree', 'ersa_lower_bound', 'ersa_upper_bound', 'seg_count', 'total_seg_len']) \
        .filter(pl.col('id1') != pl.col('id2'))  # Removed .set_index(['id1', 'id2'])
    return data


if __name__ == '__main__':

    logging.basicConfig(filename='log.log', level=logging.DEBUG, format='%(levelname)s:%(asctime)s %(message)s')

    ibd_path = snakemake.input['ibd']
    # within families
    # across families
    ersa_path = snakemake.input['ersa']
    output_path = snakemake.output[0]

    with open(ersa_path, 'r') as ersa_file, open(ibd_path, 'rb') as ibd_file:
        if len(ersa_file.readlines(5000)) < 2 or len(ibd_file.readlines(5000)) < 1:
            print("ersa postprocess input is empty")
            open(output_path, "w").close()  # create empty output to avoid error
            quit()
    if snakemake.params.get('ibis', True):
        ibd = read_ibis(ibd_path)
    else:
        _columns = [
            ('id1', pl.Utf8),
            ('id2', pl.Utf8),
            ('total_seg_len_ibd2', pl.Float64),
            ('seg_count_ibd2', pl.Int32)
        ]
        ibd = pl.DataFrame(data=[], schema=_columns).lazy()

    ersa = read_ersa(ersa_path)

    #logging.info(f'ibd shape: {ibd.shape[0]}, ersa shape: {ersa.shape[0]}')
    relatives = ibd.join(ersa, how='outer', on=['id1', 'id2'])  # No left_index / right_index input params

    # It is impossible to have more than 50% of ibd2 segments unless you are monozygotic twins or duplicates.
    GENOME_CM_LEN = 3400
    DUPLICATES_THRESHOLD = 1750
    FS_TOTAL_THRESHOLD = 2100
    FS_IBD2_THRESHOLD = 450
    dup_mask = pl.col('total_seg_len_ibd2') > DUPLICATES_THRESHOLD
    fs_mask = (pl.col('total_seg_len') > FS_TOTAL_THRESHOLD) & (pl.col('total_seg_len_ibd2') > FS_IBD2_THRESHOLD) & (
        dup_mask.is_not())
    po_mask = (pl.col('total_seg_len') / GENOME_CM_LEN > 0.77) & (fs_mask.is_not()) & (dup_mask.is_not())

    relatives = relatives.with_columns([
        pl.when(dup_mask)
        .then(0)
        .when(fs_mask)
        .then(2)
        .when(po_mask)
        .then(1)
        #.when((pl.col('ersa_degree') == 1)) # when ersa_degree is 1 but PO mask does not tell us that it is PO
        #.then(2)
        .otherwise(pl.col('ersa_degree'))
        .alias('final_degree'),

        pl.when(dup_mask)
        .then('MZ/Dup')
        .when(fs_mask)
        .then('FS')
        .when(po_mask)
        .then('PO')
        #.when((pl.col('ersa_degree') == 1)) # when ersa_degree is 1 but PO mask does not tell us that it is PO
        #.then('FS')
        .otherwise(pl.col('ersa_degree'))
        .alias('relation'),

        pl.col('total_seg_len_ibd2')
        .fill_null(pl.lit(0)),

        pl.col('seg_count_ibd2')
        .fill_null(pl.lit(0))
    ])


    '''
        IBIS and ERSA do not distinguish IBD1 and IBD2 segments
        shared_genome_proportion is a proportion of identical alleles
        i.e. shared_genome_proportion of [(0/0), (0/1)] and [(0/0), (1/1)] should be 3/4
        GENOME_SEG_LEN is length of centimorgans of the haplotype
        ibd2 segments span two haplotypes, therefore their length should be doubled
        total_seg_len includes both ibd1 and ibd2 segments length
    '''
    shared_genome_formula = (pl.col('total_seg_len') - pl.col('total_seg_len_ibd2')) / (2*GENOME_CM_LEN) + pl.col('total_seg_len_ibd2')*2 / (2*GENOME_CM_LEN)
    relatives = relatives.with_columns(
        pl.when(shared_genome_formula > 1)
        .then(1)
        .otherwise(shared_genome_formula)
        .alias('shared_genome_proportion')
    )
    relatives = relatives.filter(~(pl.col('final_degree').is_null())).collect(streaming=True)
    print(f'We have {len(relatives.filter(dup_mask))} duplicates, '
          f'{len(relatives.filter(fs_mask))} full siblings and '
          f'{len(relatives.filter(po_mask))} parent-offspring relationships')
    logging.info(f'final degree not null: {len(relatives["final_degree"]) - relatives["final_degree"].null_count()}')
    relatives.write_csv(output_path, separator='\t')
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import logging
import re


ATGC = {'A': 'T', 'T': 'A', 'C': 'G', 'G': 'C'}
atgc = {'A': 1, 'T': 1, 'G': 0, 'C': 0}


def is_ambiguous(a1, a2):
    """if SNP is ambiguous or not"""
    if atgc[a1] == atgc[a2]:
        return True
    return False


def match_ref(a1, a2, ref, alt):
    """Match a1, a2 with ref and alt
    None: match, 1: swap, 2: flip, 3: flip&swap, 4: exclude
    """
    if a1 == ref and a2 == alt:
        return
    elif a1 == alt and a2 == ref:
        return 1
    else:
        ambiguous = is_ambiguous(a1, a2)
        if ambiguous:
            return 4
        else:
            a1_f, a2_f = [ATGC[i] for i in [a1, a2]]
            if a1_f == ref and a2_f == alt:
                return 2
            elif a1_f == alt and a2_f == ref:
                return 3
            else:
                return 4


def get_id(a1_a2, id_element, chr_, pos_, ref_, alt_):
    # first, try to use rs
    id_ = id_element.split(':')[0]
    if id_ in a1_a2:
        return id_
    # then, try chr:pos
    id_ = "{}:{}".format(chr_, pos_)
    if id_ in a1_a2:
        return id_
    # try chr:pos:ref:alt
    id_ = "{}:{}:{}:{}".format(chr_, pos_, ref_, alt_)
    if id_ in a1_a2:
        return id_
    # try chr:pos:alt:ref
    id_ = "{}:{}:{}:{}".format(chr_, pos_, alt_, ref_)
    if id_ in a1_a2:
        return id_
    return None


def pre_imputation_check(params, reference):
    """Given a plink bim file
    0. Remove SNPS with worldwide MAF < 0.02
    1. Remove SNPs can not be matched
    2. Flip SNPs that match after flip
    3. Swap SNPs that match after swap or flip&swap
    4. Update chromosomes and positions for these SNPs
    """
    prefix = params
    fn_new = prefix
    a1_a2 = {}
    for i in open(fn_new):
        items = i.split()
        # id: (ref, alt)
        # %CHROM:%POS:%REF:%FIRST_ALT
        a1_a2[items[1]] = (items[-2], items[-1])

    # files to update bim
    chr_path = prefix + '.chr'
    pos_path = prefix + '.pos'
    force_allele_path = prefix + '.force_allele'
    flip_path = prefix + '.flip'
    freq_path = prefix + '.freq'
    with open(chr_path, 'w') as chr_file, open(pos_path, 'w') as pos_file, open(force_allele_path, 'w') as force_allele_file, open(flip_path, 'w') as flip_file, open(freq_path, 'w') as freq_file:

        af_regex = re.compile(';AF=(0.\d+)')
        # write the files to be used
        in_ref = 0
        exclude = 0
        strand_flip = 0
        swap = 0
        flip_swap = 0
        unique_ids = set()
        for i in open(reference):
            items = i.split()
            str_maf = af_regex.findall(items[7])
            maf = float(str_maf[0]) if len(str_maf) > 0 else 0.0
            chr_, pos_, ref_, alt_ = items[0], items[1], items[3], items[4]
            # first, try to use rss
            id_ = get_id(a1_a2, items[2], chr_, pos_, ref_, alt_)

            if id_ is not None:
                a1, a2 = a1_a2[id_]
                matching = match_ref(a1, a2, ref_, alt_)
                in_ref += 1
                if matching == 4:
                    exclude += 1
                else:
                    if id_ in unique_ids:
                        continue
                    if maf > 0.02:
                        freq_file.write(id_ + "\n")
                    chr_file.write("{}\t{}\n".format(id_, chr_))
                    pos_file.write("{}\t{}\n".format(id_, pos_))
                    # set alt as A1, because recode vcf will code A1 as alt later
                    force_allele_file.write("{}\t{}\n".format(id_, alt_))
                    unique_ids.add(id_)
                    if matching == 2:
                        flip_file.write(id_ + "\n")
                        strand_flip += 1
                    elif matching == 1:
                        swap += 1
                    elif matching == 3:
                        flip_file.write(id_ + "\n")
                        flip_swap += 1

    logging.info("Exclude: {} Keep: {}".format(exclude, in_ref - exclude))
    logging.info("Total flip: {}.".format(strand_flip))
    logging.info("Total swap: {}.".format(swap))
    logging.info("Total flip&swap: {}.".format(flip_swap))
    return flip_path, force_allele_path, chr_path, pos_path


if __name__ == "__main__":
    logging.basicConfig(filename=snakemake.log[0], level=logging.DEBUG, format='%(levelname)s:%(asctime)s %(message)s')

    pre_imputation_check(snakemake.input[0], snakemake.params[0])
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import gzip
import logging


if __name__ == '__main__':
    genotyped_count = 0
    input_file = snakemake.input['vcf']
    output_file = snakemake.output['vcf']

    logging.basicConfig(filename=snakemake.log[0], level=logging.DEBUG, format='%(levelname)s:%(asctime)s %(message)s')

    with gzip.open(output_file, 'ab') as gzab:
        with gzip.open(input_file, 'r') as gzr:
            for cnt, line in enumerate(gzr):
                if (cnt + 1) % 100000 == 0:
                    logging.info(f'processed {cnt} variants')
                if b'IMPUTED' not in line:
                    genotyped_count += 1
                    gzab.write(line)
    logging.info(f'processed {cnt} variants, removed {cnt - genotyped_count} variants, {genotyped_count} variants remained')
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import pandas
import logging
from utils.bcftools import bcftools_stats
from utils.bcftools import bcftools_query
from utils.bcftools import bcftools_view
from io import StringIO


def find_outliers(psc: pandas.DataFrame, outliers_file_path: str, keep_samples_file_path: str, alpha: float):

    q1 = psc.nNonMissing.quantile(0.25)
    q3 = psc.nNonMissing.quantile(0.75)
    iqr = q3-q1
    lower_bound_mask = psc.nNonMissing < (q1 - alpha*iqr)
    upper_bound_mask = psc.nNonMissing > (q3 + alpha*iqr)
    outliers = psc[lower_bound_mask | upper_bound_mask]
    if outliers.empty:
        outliers['exclusion_reason'] = []
    else:
        outliers.loc[lower_bound_mask, 'exclusion_reason'] = f'Sample was below 1st quantile - IQR*{alpha} ({q1 - alpha*iqr}) by Missing Samples'
        outliers.loc[upper_bound_mask, 'exclusion_reason'] = f'Sample was above 3rd quantile + IQR*{alpha} ({q3 + alpha*iqr}) by Missing Samples'
    keep_samples = pandas.concat([psc, outliers, outliers]).drop_duplicates(keep=False)

    outliers_list = list(outliers.sample_id)
    with open(outliers_file_path, 'w') as outliers_file:
        for outlier in outliers_list:
            outliers_file.write(outlier + '\n')

    keep_samples_list = list(keep_samples.sample_id)
    with open(keep_samples_file_path, 'w') as keep_samples_file:
        for sample in keep_samples_list:
            keep_samples_file.write(sample + '\n')

    return outliers


def get_stats(vcf_input, samples_path, psc_path=False, stats_path=''):

    bcftools_query(vcf_input, list_samples=True, save_output=samples_path)
    raw_table = bcftools_stats(vcf_input, samples_path, save_output=psc_path, save_stats=stats_path)
    names = [
        'PSC',
        'id',
        'sample_id',
        'nRefHom',
        'nNonRefHom',
        'nHets',
        'nTransitions',
        'nTransversions',
        'nIndels',
        'average depth',
        'nSingletons',
        'nHapRef',
        'nHapAlt',
        'nMissing'
    ]
    psc = pandas.read_table(StringIO(raw_table), header=None, names=names)

    psc.loc[:, 'nNonMissing'] = psc.nRefHom + psc.nNonRefHom + psc.nHets
    psc.loc[:, 'missing_share'] = psc.nMissing / (psc.nMissing + psc.nNonMissing)
    psc.loc[:, 'alt_hom_share'] = psc.nNonRefHom / psc.nNonMissing
    psc.loc[:, 'het_samples_share'] = psc.nHets / psc.nNonMissing

    return psc



if __name__ == '__main__':

    input_vcf = snakemake.input['vcf']
    stats_file = snakemake.output['stats']
    samples = snakemake.params['samples']
    psc_path = snakemake.params['psc']
    keep_samples = snakemake.params['keep_samples']
    bad_samples_path = snakemake.output['bad_samples']
    report_path = snakemake.output['report']
    outliers = snakemake.output['outliers']
    no_outliers_vcf = snakemake.output['no_outliers_vcf']
    missing_samples = float(snakemake.params['missing_samples'])
    alt_hom_samples = float(snakemake.params['alt_hom_samples'])
    het_samples = float(snakemake.params['het_samples'])
    alpha = float(snakemake.params['iqr_alpha'])

    logging.basicConfig(filename=snakemake.log[0], level=logging.DEBUG, format='%(levelname)s:%(asctime)s %(message)s')

    # get initial stats, that we do not save
    logging.info("Getting initial vcf stats...")
    psc = get_stats(vcf_input=input_vcf,
                    samples_path=samples,
                    psc_path=psc_path,
                    stats_path=stats_file)
    # filter outliers
    outliers = find_outliers(psc, outliers_file_path=outliers, keep_samples_file_path=keep_samples, alpha=alpha)
    logging.info(f"Found {len(outliers)} outliers!")
    if not outliers.empty:
        bcftools_view(input_vcf, no_outliers_vcf, keep_samples)
    else:
        with open(no_outliers_vcf, 'w') as dummy_no_outliers_vcf:
            dummy_no_outliers_vcf.write('')
        no_outliers_vcf = input_vcf

    # get final stats without outliers
    logging.info("Getting final vcf stats...")
    psc = get_stats(vcf_input=no_outliers_vcf,
                    samples_path=samples,
                    psc_path=psc_path,
                    stats_path=stats_file)

    bad_missing_samples_mask = (psc.missing_share >= missing_samples / 100) | (psc.nMissing + psc.nNonMissing == 0)

    bad_alt_hom_samples_mask = (psc.alt_hom_share < alt_hom_samples / 100) | (psc.nNonMissing == 0)

    bad_het_samples_mask = (psc.het_samples_share < het_samples / 100) | (psc.nNonMissing == 0)

    psc.loc[bad_het_samples_mask, 'exclusion_reason'] = f'Sample has <= {het_samples}% heterozygous SNPs'
    psc.loc[bad_alt_hom_samples_mask, 'exclusion_reason'] = f'Sample has <= {alt_hom_samples}% homozygous alternative SNPs'
    psc.loc[bad_missing_samples_mask, 'exclusion_reason'] = f'Sample has >= {missing_samples}% missing SNPs'
    bad_samples = psc.loc[(bad_alt_hom_samples_mask | bad_het_samples_mask | bad_missing_samples_mask),
                          ['sample_id', 'missing_share', 'alt_hom_share', 'het_samples_share', 'exclusion_reason']]
    psc = pandas.concat([psc, outliers[['sample_id', 'missing_share', 'alt_hom_share', 'het_samples_share', 'exclusion_reason']]], ignore_index=True)

    samples_only = bad_samples.loc[:, ['sample_id']].copy()
    # if input vcf file has iids in form fid_iid we split it, else we just assign fid equal to iid
    samples_only.loc[:, 'fid'] = [s.split('_')[0] if '_' in s else s for s in samples_only.sample_id]
    samples_only.loc[:, 'iid'] = [s.split('_')[1] if '_' in s else s for s in samples_only.sample_id]

    samples_only.loc[:, ['fid', 'iid']].to_csv(bad_samples_path, sep='\t', header=None, index=False)
    bad_samples.to_csv(report_path, sep='\t')

    log = f'We have total of {psc.shape[0]} samples\n' \
          f'{bad_missing_samples_mask.sum()} samples have >= {missing_samples}% missing share\n' \
          f'{bad_alt_hom_samples_mask.sum()} samples have <= {alt_hom_samples}% homozygous alternative variants\n' \
          f'{bad_het_samples_mask.sum()} samples have <= {het_samples}% heterozygous variants\n' \
          f'{len(outliers)} samples detected as outliers\n'

    logging.info(log)
    print(log)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import pandas
import os


if __name__ == '__main__':
    path = snakemake.input['bim']
    map_dir = snakemake.params['cm_dir']
    bim = pandas.read_table(path, header=None, names=['chrom', '_id', 'cm_pos', 'bp_pos', 'ref', 'alt'])

    maps = bim.loc[:, ['chrom', '_id', 'cm_pos', 'bp_pos']].groupby(by='chrom')
    for chrom, cmmap in maps:
        map_path = os.path.join(map_dir, f'chr{chrom}.cm.map')

        frame = cmmap.reset_index()
        frame.to_csv(map_path, index=False, sep='\t', header=None)
        print(f'written map for chrom {chrom} to {map_path}')

    print(f'written {len(maps)} maps to {map_dir}')
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import pandas
import logging
import mmh3
from typing import List
import os
from collections import namedtuple


def process_chunk_with_hash(data: pandas.DataFrame, denominator: int, dest_dir: str):
    data.loc[:, 'id1'] = data.sample1.str.replace(':', '_') + ' ' + data.sample1.str.replace(':', '_')
    data.loc[:, 'id2'] = data.sample2.str.replace(':', '_') + ' ' + data.sample2.str.replace(':', '_')
    # awk '{{sub(":", "_", $1); sub(":", "_", $2); print $1, $1 "\t" $2, $2 "\t" $3 "\t" $4, $5 "\t" 0, 0 "\t" $10 "\t" $9 "\t" "cM" "\t" 0 "\t" 0 "\t" 0}};
    data.loc[:, 'phys_pos'] = [f'{start} {end}' for start, end in zip(data.phys_start_pos, data.phys_end_pos)]
    data.loc[:, 'zero1'] = '0 0'
    data.loc[:, 'zero2'] = '0'
    data.loc[:, 'cM'] = 'cM'
    data.loc[:, 'ibd21'] = [2 if ibd_type == 'IBD2' else 0 for ibd_type in data.IBD_type]
    data.loc[:, 'ibd22'] = data.ibd21
    data.loc[:, 'bucket_id'] = [mmh3.hash(id1) % denominator for id1 in data.id1]

    to_write = data.loc[:,
               ['bucket_id', 'id1', 'id2', 'chrom', 'phys_pos', 'zero1', 'marker_count', 'genetic_seg_length', 'cM', 'zero2',
                'ibd21', 'ibd22']]

    for bucket_id, group in to_write.groupby('bucket_id'):
        dest_file = os.path.join(dest_dir, f'{bucket_id}.tsv.gz')
        group.drop('bucket_id', inplace=True, axis='columns')
        group.to_csv(dest_file, index=False, header=None, sep='\t', mode='a', compression='gzip')


def split_by_id(input_ibd: str, samples_count: int, dest_dir: str):
    names = [
        'sample1',
        'sample2',
        'chrom',
        'phys_start_pos',
        'phys_end_pos',
        'IBD_type',
        'genetic_start_pos',
        'genetic_end_pos',
        'genetic_seg_length',
        'marker_count',
        'error_count',
        'error_density'
    ]
    read_chunksize = int(1e+6)
    samples_chunksize = 2000
    denominator = samples_count // samples_chunksize + 1

    for i, chunk in enumerate(pandas.read_csv(input_ibd, header=None, names=names, sep='\t', chunksize=read_chunksize)):
        if not chunk.empty:
            process_chunk_with_hash(chunk, denominator, dest_dir)
            logging.info(f'Chunk {i} of size {read_chunksize} was written to {dest_dir} and split into {denominator} buckets')
        else:
            logging.info(f'No IBD segments were found in {input_ibd}. Nothing was written to {dest_dir}')


if __name__ == '__main__':
    try:
        snakemake
    except NameError:
        Snakemake = namedtuple('Snakemake', ['input', 'output', 'log'])
        snakemake = Snakemake(
            input={
                'ibd': '/media/data1/relatives/runs/run100k/ibis/merged_ibis.seg',
                'fam': '/media/data1/relatives/runs/run100k/preprocessed/data.fam'},
            output={'bucket_dir': '/media/data1/relatives/test100koutput.txt'},
            log=['/media/data1/relatives/test100koutput.log']
        )

    ibd = snakemake.input['ibd']
    bucket_dir = snakemake.output['bucket_dir']
    if not os.path.exists(bucket_dir):
        os.makedirs(bucket_dir)
    for file in os.listdir(bucket_dir):
        os.remove(os.path.join(bucket_dir, file))

    samples = pandas.read_table(snakemake.input['fam'])
    samples_count = samples.shape[0] + 1 # there is no header in the file
    print(snakemake.log)
    logging.basicConfig(filename=snakemake.log[0], level=logging.DEBUG, format='%(levelname)s:%(asctime)s %(message)s')

    split_by_id(ibd, samples_count, bucket_dir)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import pandas
import logging
import mmh3
from typing import List
import os
from collections import namedtuple


def process_chunk_with_hash(data: pandas.DataFrame, denominator: int, dest_dir: str):
    logging.info(f'DTYPES of rapid frame are {data.dtypes}')
    data.loc[:, 'id1'] = data.sample1.str.replace(':', '_') + ' ' + data.sample1.str.replace(':', '_')
    data.loc[:, 'id2'] = data.sample2.str.replace(':', '_') + ' ' + data.sample2.str.replace(':', '_')
    data.loc[:, 'phys_pos'] = [f'{start} {end}' for start, end in zip(data.genetic_start_pos, data.genetic_end_pos)]
    data.loc[:, 'zero1'] = '0 0'
    data.loc[:, 'zero2'] = '0'
    data.loc[:, 'cM'] = 'cM'
    data.loc[:, 'ibd21'] = 0
    data.loc[:, 'ibd22'] = 0
    data.loc[:, 'bucket_id'] = [mmh3.hash(id1) % denominator for id1 in data.id1]
    data.loc[:, 'marker_count'] = data.ending_site - data.starting_site

    to_write = data.loc[:,
               ['bucket_id', 'id1', 'id2', 'chrom', 'phys_pos', 'zero1', 'marker_count', 'genetic_seg_length', 'cM', 'zero2',
                'ibd21', 'ibd22']]

    for bucket_id, group in to_write.groupby('bucket_id'):
        dest_file = os.path.join(dest_dir, f'{bucket_id}.tsv')
        group.drop('bucket_id', inplace=True, axis='columns')
        group.to_csv(dest_file, index=False, header=None, sep='\t', mode='a')


def split_by_id(input_ibd: str, samples_count: int, dest_dir: str):
    """
    <chr_name> <sample_id1> <sample_id2> 
    <hap_id1> <hap_id2> 
    <starting_pos_genomic> <ending_pos_genomic> 
    <genetic_length> <starting_site> <ending_site>
    """
    names = [
        'chrom',
        'sample1',
        'sample2',
        'hap_id1',
        'hap_id2',
        'genetic_start_pos',
        'genetic_end_pos',
        'genetic_seg_length',
        'starting_site',
        'ending_site'
    ]
    read_chunksize = int(1e+6)
    samples_chunksize = 1000
    denominator = samples_count // samples_chunksize + 1

    for i, chunk in enumerate(pandas.read_csv(input_ibd, header=None, names=names, sep='\t', chunksize=read_chunksize)):
        if not chunk.empty:
            process_chunk_with_hash(chunk, denominator, dest_dir)
            logging.info(f'Chunk {i} of size {read_chunksize} was written to {dest_dir} and split into {denominator} buckets')
        else:
            logging.info(f'No IBD segments were found in {input_ibd}. Nothing was written to {dest_dir}')


if __name__ == '__main__':
    try:
        snakemake
    except NameError:
        Snakemake = namedtuple('Snakemake', ['input', 'output', 'log'])
        snakemake = Snakemake(
            input={
                'ibd': '/media/data1/relatives/runs/run100k/ibis/merged_ibis.seg',
                'fam': '/media/data1/relatives/runs/run100k/preprocessed/data.fam'},
            output={'bucket_dir': '/media/data1/relatives/test100koutput.txt'},
            log=['/media/data1/relatives/test100koutput.log']
        )

    ibd = snakemake.input['ibd']
    bucket_dir = snakemake.output['bucket_dir']
    if not os.path.exists(bucket_dir):
        os.makedirs(bucket_dir)
    for file in os.listdir(bucket_dir):
        os.remove(os.path.join(bucket_dir, file))

    samples = pandas.read_table(snakemake.input['fam'])
    samples_count = samples.shape[0] + 1 # there is no header in the file
    print(snakemake.log)
    logging.basicConfig(filename=snakemake.log[0], level=logging.DEBUG, format='%(levelname)s:%(asctime)s %(message)s')

    split_by_id(ibd, samples_count, bucket_dir)
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import allel
import zarr
import numpy
import os
import time


def write_ped_buffer(buffer: dict, out: str):
    start = time.time()
    with open(out, 'a') as ped:
        lines = []
        for sample, gt in buffer.items():
            #gt_str = numpy.array2string(gt, separator=' ', max_line_width=gt.shape[0], threshold=gt.shape[0])
            line = ' '.join(['0', sample, '0', '0', '0', '0']) + ' ' + ' '.join(gt) + '\n'
            print(sample, len(gt))
            lines.append(line)

        ped.writelines(lines)

    end = time.time()
    print(f'write_ped_buffer took {end - start} seconds')


def write_map_file(ids, chrom, positions, out: str):
    with open(out, 'w') as file:
        for _id, _chr, pos in zip(ids, chrom, positions):
            file.write('\t'.join([_chr, _id, '0', str(pos)]) + '\n')


if __name__ == '__main__':
    start = time.time()

    vcf = snakemake.input['vcf'][0]
    zarr_cache = snakemake.params['zarr']
    ped = snakemake.output['ped']
    map_file = snakemake.output['map_file']

    '''
    vcf = 'test_data/imputed_chr9.vcf.gz'
    zarr_cache = 'test_data/imputed_chr9.zarr'
    ped = 'test_data/imputed_chr9.ped'
    map_file = 'test_data/imputed_chr9.map'
    '''

    '''
    vcf = '/home/ag3r/vcf_to_ped/testsample.vcf.gz'
    zarr_cache = '/home/ag3r/vcf_to_ped/testsample.zarr'
    ped = '/home/ag3r/vcf_to_ped/testsample.ped'
    map_file = '/home/ag3r/vcf_to_ped/testsample.map'
    '''

    if os.path.exists(ped):
        os.remove(ped)

    allel.vcf_to_zarr(
        input=vcf,
        output=zarr_cache,
        overwrite=True,
        fields=['variants/CHROM', 'variants/ID', 'variants/POS', 'variants/REF', 'variants/ALT', 'calldata/GT', 'samples']
    )

    array = zarr.open_group(zarr_cache, mode='r')

    samples_in_chunk = 100
    ids = array['variants/ID'][:]
    chrom = array['variants/CHROM'][:]
    positions = array['variants/POS'][:]
    write_map_file(ids, chrom, positions, map_file)

    # only for the single chromosome!
    print(f'written total of {len(positions)} variants from chr{chrom[0]} to {map_file}')

    for sample_start in range(0, len(array['samples']), samples_in_chunk):
        sample_end = min(len(array['samples']), sample_start + samples_in_chunk)
        samples = array['samples'][sample_start: sample_end]
        ped_buffer = {}

        # [variants]
        ref = array['variants']['REF'][:].reshape(-1, 1)
        # [variants, alt_count]
        alt = array['variants']['ALT'][:]

        # [variants, alt_count + 1]
        alleles = numpy.hstack([ref, alt])
        alleles[alleles == ''] = ' '
        # [variants, samples, 2] # 2 because we use biallelic only
        gt = array['calldata']['GT'][:, sample_start: sample_end, :]
        for sample_idx, sample in enumerate(samples):

            # [variants, 2]
            line = numpy.zeros((gt.shape[0], 2), dtype=alleles.dtype)

            # [variants] contains 0 or 1, i.e. count of reference alleles in corresponding haplotype (fwd or bck)
            fwd_idx = gt[:, sample_idx, 0].astype(bool)
            bck_idx = gt[:, sample_idx, 1].astype(bool)


            #print(gt.shape, alleles.shape, gt[:, sample_idx, 0].shape)
            # [variants] ref_line contains ref variants, alt_line contains alf_variants
            ref_line, alt_line = alleles[~fwd_idx, 0], alleles[fwd_idx, 1]
            # we are setting first haplotype ref and alt variants in corresponding positions
            line[~fwd_idx, 0] = ref_line
            line[fwd_idx, 0] = alt_line

            ref_line, alt_line = alleles[~bck_idx, 0], alleles[bck_idx, 1]
            line[~bck_idx, 1] = ref_line
            line[bck_idx, 1] = alt_line

            line_array = line.ravel()
            #print(line_array[:5], line_array.shape)

            #for variant_idx in range(gt.shape[0]):
            #    fwd_idx, bck_idx = gt[variant_idx, sample_idx, 0], gt[variant_idx, sample_idx, 1]
            #    fwd = alleles[variant_idx, fwd_idx]
            #    ped_line.append(fwd)

            #    bck = alleles[variant_idx, bck_idx]
            #    ped_line.append(bck)

            ped_buffer[sample] = line_array

        write_ped_buffer(ped_buffer, ped)
        print(f'written {len(ped_buffer)} samples to {ped}')
    end = time.time()
    print(f'total execution time is {end - start:.4f} seconds')
    print(f'written total of {len(array["samples"])} and {len(array["variants"]["REF"])} variants to {ped}')
ShowHide 71 more snippets with no or duplicated tags.

Login to post a comment if you would like to share your experience with this workflow.

Do you know this workflow well? If so, you can request seller status , and start supporting this workflow.

Free

Created: 1yr ago
Updated: 1yr ago
Maitainers: public
URL: https://github.com/genxnetwork/grape
Name: grape
Version: v1.8
Badge:
workflow icon

Insert copied code into your website to add a link to this workflow.

Downloaded: 0
Copyright: Public Domain
License: GNU General Public License v3.0
  • Future updates

Related Workflows

cellranger-snakemake-gke
snakemake workflow to run cellranger on a given bucket using gke.
A Snakemake workflow for running cellranger on a given bucket using Google Kubernetes Engine. The usage of this workflow ...