Oxford Nanopore Variant Calling and Methylation Pipeline

public public 1yr ago 0 bookmarks

This is a Snakemake pipeline designed to take Oxford Nanopore Technologies data from fastq's to variant calls. In additions to traditional SNVs and indels, this pipeline will also call methylation using nanopolish.

The Snakefile is under workflow .

The functions performed by this tool are as follows:

Alignment

  • minimap2

SV Calling

  • Sniffles

  • CuteSV

  • SVIM

SNV/indel Calling

  • Clair3

Read-phasing

  • longphase

Methylation tag linking

  • In-house pipeline

There are dummy rules which can be added to the snakemake command which run specific tests. They are listed below:

  • all_align

    • Runs minimap2 for all samples
  • all_sv

    • Runs all SV callers for all samples
  • all_snv

    • Runs Clair3 for all samples
  • all_haplotag

    • Runs haplotype phasing using longphase for all samples
  • all_haplotag_methyl

    • Haplotags bams and links methylation bams for all samples
  • all_minimap_methyl

    • Links methylation to unphased bams for all samples

Code Snippets

21
22
script:
    "../scripts/split_faidx.py"
49
50
51
52
53
shell:
    """
    samtools fqidx {input.fastq} -r {input.batch_file} > $TMPDIR/scatteritem.fastq
    minimap2 -t {threads} --MD --secondary=no --eqx -x map-ont -a {input.ref} $TMPDIR/scatteritem.fastq | samtools view -S -b | samtools sort -T $TMPDIR > {output.sorted_bam}
    """
80
81
82
83
shell:
    """
    samtools merge -@{threads} {output.scatter_merged_bam} {input.sorted_bams}
    """
106
107
108
109
shell:
    """
    samtools merge -@{threads} {output.merged_bam} {input.scatter_merged_bam}
    """
132
133
134
135
shell:
    """
    samtools index {input.merged_bam}
    """
17
18
script:
    "../scripts/append_mod_tags.py"
20
21
22
23
shell:
    """
    {params.script_dir}/longphase haplotag --snp-file={input.snv_vcf} --bam-file={input.bam} --qualityThreshold=1 -t {threads} --sv-file={input.sv_vcf} -o $( echo {output.bam} | sed 's/.bam//' )
    """
46
47
48
49
shell:
    """
    samtools index {input.merged_bam}
    """
18
19
20
21
22
23
24
25
26
shell:
    """
    run_clair3.sh --bam_fn={input.merged_bam} --sample_name={wildcards.sample} --ref_fn={input.ref} --threads={threads} --platform=ont --model_path=$(dirname $( which run_clair3.sh  ) )/models/ont_guppy5 --output=$(dirname {output.vcf}) --ctg_name={wildcards.chrom} --enable_phasing
    if [[ ( -f $( echo {output.vcf} | sed 's/phased_//' ) ) && ( ! -f {output.vcf} ) ]]; then
        cp $( echo {output.vcf} | sed 's/phased_//' ) {output.vcf}
    elif [[ ( ! -f $( echo {output.vcf} | sed 's/phased_//' ) ) && ( ! -f {output.vcf} ) ]]; then
        zcat $(dirname {output.vcf})/pileup.vcf.gz | grep ^# | bgzip -c > {output.vcf}
    fi
    """
48
49
50
51
shell:
    """
    bcftools concat -O v -o {output.vcf} {input.vcf}
    """
76
77
78
79
shell:
    """
    sniffles -i {input.merged_bam} --reference {input.ref} --output-rnames -v {output.vcf}
    """
105
106
107
108
109
shell:
    """
    zcat {input.ref} > {output.cuteSV_ref} || cat {input.ref} > {output.cuteSV_ref}
    cuteSV -t {threads} --genotype --max_cluster_bias_INS 100 --diff_ratio_merging_INS 0.3 --max_cluster_bias_DEL 100 --diff_ratio_merging_DEL 0.3 {input.merged_bam} {output.cuteSV_ref} {output.cuteSV_vcf} $( dirname {output.cuteSV_vcf} )
    """
135
136
137
138
139
shell:
    """
    svim alignment --sample {wildcards.sample} $( dirname {output.vcf_tmp} ) {input.merged_bam} {input.ref}
    cp -l {output.vcf_tmp} {output.vcf}
    """
162
163
164
165
166
shell:
    """
    bcftools sort -o /dev/stdout -O v {input.vcf} | bgzip -c > {output.zipped}
    tabix {output.zipped}
    """
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
import os
import sys
import time
import itertools
import pickle
import sqlite3

from multiprocessing import Pool

import pysam

# LOGGING
sys.stdout = open(snakemake.log[0], "w")


def create_database(db_name):
    if os.path.exists(db_name):
        os.remove(db_name)

    conn = sqlite3.connect(db_name)
    c = conn.cursor()
    c.execute("CREATE TABLE meth_tags (qname TEXT PRIMARY KEY, tag BLOB)")

    # commit changes and close connection
    conn.commit()
    conn.close()


def get_time():
    time_rn = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
    return time_rn


def fetch_modified_bases(modified_obj, db_name) -> None:
    """
    Fetch base modification tags Mm & Ml
    :param modified_obj: An unsorted bam pysam object with just methylation tags
    :param db_name:
    :return: None
    """
    print(f"Opening {modified_obj.filename.decode()} to fetch tags. {get_time()}")

    # Start database connection
    conn = sqlite3.connect(db_name)
    c = conn.cursor()

    for read in modified_obj.fetch(until_eof=True):
        if read.has_tag("Mm") or read.has_tag("MM"):
            tags = read.get_tags()
            qname = read.query_name

            # serialize the tags list
            serialized_list = pickle.dumps(tags)
            c.execute(
                "INSERT INTO meth_tags VALUES (?, ?)",
                (str(qname), sqlite3.Binary(serialized_list)),
            )

    modified_obj.close()

    # commit changes and close connection
    conn.commit()
    conn.close()

    print(
        f"Base modification tags fetched for {modified_obj.filename.decode()}. {get_time()}"
    )


def write_linked_tags(bam, db_name, out_file) -> None:
    """
    Write out merged bam with Mm tags and possibly Ml, and its index.
    :param bam: An aligned bam
    :param db_name:
    :param out_file:
    :return: None
    """

    # Connect to db
    conn = sqlite3.connect(db_name)
    c = conn.cursor()

    appended_tags = pysam.AlignmentFile(out_file, "wb", template=bam)
    for read in bam.fetch(until_eof=True):
        result = c.execute(
            "SELECT tag FROM meth_tags WHERE qname = ?", (str(read.qname),)
        ).fetchone()
        if result:
            deserialized_tag = pickle.loads(result[0])
            read.set_tags(read.get_tags() + deserialized_tag)
        appended_tags.write(read)

    print(f"File written to: {out_file}")
    appended_tags.close()

    # write index
    pysam.index(out_file)
    print(f"Index written for {out_file}.bai")


def collect_tags(methyl_sn_input: list, db_name: str) -> None:
    # methyl_sn_input: snakemake input
    """
    Collect optional tags from ONT bam with methyl calls
    :param methyl_sn_input: a list of file paths pointing to methyl bam
    :param db_name:
    :return: None
    """

    if not len(methyl_sn_input) == 1:
        for bam in methyl_sn_input:
            methyl_bam = pysam.AlignmentFile(bam, "rb", check_sq=False)
            fetch_modified_bases(methyl_bam, db_name)
    else:
        methyl_bam = pysam.AlignmentFile(methyl_sn_input[0], "rb", check_sq=False)
        fetch_modified_bases(methyl_bam, db_name)


def make_subset_bams(input_bam, prefix) -> list[str]:
    subset_size = 100 * 1024 * 1024  # 100MB in bytes

    if os.path.getsize(input_bam.filename.decode()) < subset_size:
        subset_size = int(subset_size / 10)

    subset_idx = 0
    subset_size_bytes = 0
    current_subset = None

    bam_file_list = []

    for read in input_bam:
        # If the current subset is None or its size has exceeded the subset size, create a new subset
        if current_subset is None or subset_size_bytes >= subset_size:
            # If this is not the first subset, close the previous subset file
            if current_subset is not None:
                current_subset.close()
                pysam.index(current_subset.filename.decode())

            # Create a new subset file with a name based on the subset index
            subset_idx += 1
            current_subset = pysam.AlignmentFile(
                f"{prefix}_tmp.{subset_idx}.bam", "wb", template=input_bam
            )
            bam_file_list.append(f"{prefix}_tmp.{subset_idx}.bam")

        # Write the current read to the current subset file
        current_subset.write(read)
        subset_size_bytes = os.path.getsize(current_subset.filename.decode())

    # Close the last subset file
    current_subset.close()
    pysam.index(current_subset.filename.decode())

    input_bam.close()

    return bam_file_list


def combine_the_chunked(bams: list[str], merge_output: str):
    aln_bams = [pysam.AlignmentFile(x, check_sq=False) for x in bams]

    out_bam = pysam.AlignmentFile(merge_output, "wb", template=aln_bams[0])
    for bam in aln_bams:
        for records in bam:
            out_bam.write(records)
        bam.close()

    out_bam.close()
    pysam.index(merge_output)


def run_pool(bam_file: str, db_name, output_file) -> None:
    aln_bam = pysam.AlignmentFile(bam_file, "rb")

    write_linked_tags(aln_bam, db_name, output_file)

    # wait for the file to become available
    while not os.path.exists(bam_file):
        time.sleep(1)

    # file is available, remove it
    clean_up_temps([bam_file])


def clean_up_temps(files: list, suffix=".bai"):
    for f in files:
        index = f + suffix
        try:
            os.remove(f)
            os.remove(index)
            print("removed: ", f)
            print("removed: ", index)
        except FileNotFoundError:
            pass


def main():
    # Grabbing from snakemake
    threads = snakemake.threads
    methyl_collection = snakemake.input.methyl_bam
    aln_bam = snakemake.input.aln_bam
    bam = pysam.AlignmentFile(aln_bam, check_sq=False)
    prefix = os.path.join(snakemake.resources.tmpdir, snakemake.wildcards.sample)
    final_output = snakemake.output.linked_bam

    db_name = f"{prefix}-meth_tags.db"
    create_database(db_name=db_name)

    # Populate the database with methylation tags
    collect_tags(methyl_collection, db_name=db_name)

    # Make the chunks
    chunked_bams_names = make_subset_bams(input_bam=bam, prefix=prefix)
    link_bam_output_names = [
        x.replace("_tmp.", "_tmp-linked.") for x in chunked_bams_names
    ]

    with Pool(threads) as p:
        p.starmap(
            run_pool,
            zip(chunked_bams_names, itertools.repeat(db_name), link_bam_output_names),
        )
        p.close()
        p.join()

    combine_the_chunked(bams=link_bam_output_names, merge_output=final_output)

    # CLEANING UP!
    clean_up_temps(link_bam_output_names)
    clean_up_temps([db_name])


if __name__ == "__main__":
    sys.exit(main())
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
if __name__ == "__main__":
    NIDS = len(snakemake.output.batches)

    batch_dict = {}

    for i in range(NIDS):
        batch_dict[i] = []

    with open(snakemake.input.fai, "r") as infile:
        fai_list = [line.split("\t")[0] for line in infile]

    for j in range(len(fai_list)):
        batch_dict[j % NIDS].append(fai_list[j])

    outs = [open(f, "w+") for f in snakemake.output.batches]

    for i in range(NIDS):
        outs[i].write("\n".join(batch_dict[i]) + "\n")
        outs[i].close()
ShowHide 9 more snippets with no or duplicated tags.

Login to post a comment if you would like to share your experience with this workflow.

Do you know this workflow well? If so, you can request seller status , and start supporting this workflow.

Free

Created: 1yr ago
Updated: 1yr ago
Maitainers: public
URL: https://github.com/EichlerLab/ONT_pipelines
Name: ont_pipelines
Version: 1
Badge:
workflow icon

Insert copied code into your website to add a link to this workflow.

Downloaded: 0
Copyright: Public Domain
License: MIT License
  • Future updates

Related Workflows

cellranger-snakemake-gke
snakemake workflow to run cellranger on a given bucket using gke.
A Snakemake workflow for running cellranger on a given bucket using Google Kubernetes Engine. The usage of this workflow ...