Snakemake workflow to cluster genomes (e.g. Metagenome assembled genomes) into species and Strains

public public 1yr ago Version: 0.1 0 bookmarks

Authors

Usage

Step 1: Install workflow

If you simply want to use this workflow, download and extract the latest release . If you intend to modify and further develop this workflow, fork this repository. Please consider providing any generally applicable modifications via a pull request.

In any case, if you use this workflow in a paper, don't forget to give credits to the authors by citing the URL of this repository and, if available, its DOI (see above).

Step 2: Configure workflow

Configure the workflow according to your needs via editing the file config.yaml .

Step 3: Execute workflow

Test your configuration by performing a dry-run via

snakemake -n

Execute the workflow locally via

snakemake --cores $N

using $N cores or run it in a cluster environment via

snakemake --cluster qsub --jobs 100

or

snakemake --drmaa --jobs 100

See the Snakemake documentation for further details.

Testing

Tests cases are in the subfolder .test . They should be executed via continuous integration with Travis CI.

Code Snippets

10
11
12
13
14
15
16
17
18
run:


    from common.genome_stats import get_many_genome_stats
    import pandas as pd
    d= pd.read_csv(input[0],sep='\t',index_col=0,squeeze=True,usecols=[0])
    filenames = d.index.map(lambda s: f"genomes/{s}{config['fasta_extension']}")
    del d
    get_many_genome_stats(filenames,output[0],threads)
43
44
script:
    "../scripts/many_minimap.py"
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
run:

    import pandas as pd

    sep='\t'

    print(input)

    D = pd.read_csv(input[0],sep=sep)
    n_cols= D.shape[1]
    D.to_csv(output[0],sep=sep)

    for file in input[1:]:
        D = pd.read_csv(file,sep=sep)
        assert n_cols== D.shape[1], f"File {file} doen't have the same number of columns as the one before. Cannot concatenate."
        D.to_csv(output[0],sep=sep,header=False,mode='a')


    in_dir= os.path.dirname(input[0])
    import shutil
    shutil.rmtree(in_dir)
118
119
120
121
122
123
124
125
shell:
    "snakemake -s {params.path}/rules/mummer.smk "
    "--config comparison_list='{input.comparison_list}' "
    " genome_folder='{input.genome_folder}' "
    " subset={wildcards.subset} "
    " genome_stats={input.genome_stats} "
    " --rerun-incomplete "
    "-j {threads} --nolock 2> {log}"
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
run:
    import pandas as pd
    import shutil

    Mummer={}
    for file in input:
        Mummer[io.simplify_path(file)]= pd.read_csv(file,index_col=[0,1],sep='\t')

    M= pd.concat(Mummer,axis=0)
    M.index= M.index.droplevel(0)
    M.to_csv(output[0],sep='\t')


    ani_dir= os.path.dirname(input[0])
    shutil.rmtree(ani_dir)
    #sns.jointplot('ANI','Coverage',data=M.query('ANI>0.98'),kind='hex',gridsize=100,vmax=200)
16
17
script:
    "../scripts/group_species.py"
43
44
script:
    "../scripts/group_strains.py"
81
82
83
84
85
86
87
88
89
90
91
run:
    mapping = get_representative_mapping(input.cluster_file,wildcards.resolution_level)

    output_dir = output.dir
    os.makedirs(output_dir)
    input_dir= os.path.relpath(input.dir,start=output_dir)

    for genome in mapping.unique():
        os.symlink(os.path.join(input_dir,genome+'.fasta'),
                   os.path.join(output_dir,genome+'.fasta')
               )
119
120
121
122
123
124
125
126
127
128
129
130
131
132
run:

    import pandas as pd
    df= pd.read_csv(input.cluster_file,sep='\t')


    os.makedirs(output.dir)

    for species_name,sp in df.groupby('Species'):
        with open(os.path.join(output.dir,species_name+'.fasta'),'w') as fasta_out:
            for genome in sp.Representative_Strains.unique():

                with open(os.path.join(input.dir,genome+'.fasta')) as fasta_in:
                    fasta_out.write(fasta_in.read())
 7
 8
 9
10
run:
    from glob import glob
    with open(output[0],'w') as f:
        f.write('\n'.join(glob(f"{input[0]}/*{config['fasta_extension']}"))+'\n' )
30
31
shell:
    "mash sketch -o {params.out_name} -p {threads} -s {params.s} -k {params.k} -l {input[0]} 2> {log}"
49
50
51
shell:
    "mash dist -p {threads} -d {params.d} "
    "{input.genomes} {input.genomes} > {output[0]} 2> {log}"
73
74
75
76
77
78
79
80
shell:
    "bindash sketch "
    "--outfname={output[0]} "
    "--nthreads={threads} "
    "--sketchsize64={params.sketchsize64} "
    "--kmerlen={params.k} "
    "{params.extra} "
    "--listfname={input[0]} 2> {log}"
 97
 98
 99
100
101
shell:
    "bindash dist "
    "--nthreads={threads} "
    "--mthres={params.d} "
    "--outfname={output} {input[0]} 2> {log}"
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
run:

    if wildcards.tool == "mummer":
        open_function= gd.load_mummer
    elif wildcards.tool == "minimap":
        open_function= gd.load_minimap
    elif wildcards.tool == "bindash":
        open_function= gd.load_bindash
    elif wildcards.tool == "fastani":
        open_function= gd.load_fastani
    elif wildcards.tool == "mash":
        open_function= gd.load_mash
    else:
        raise Exception(
            f"Don't know how to load table from tool : {wildcards.tool}"
        )


    M= open_function(input[0]).drop(['Identity'],axis=1)
    M.to_parquet(output[0],engine="pyarrow")
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
run:

    F= gd.load_mash(input[0])
    G= gd.to_graph(F.query(f"Distance<={params.threshold}"))
    if hasattr(G,'selfloop_edges'):
        G.remove_edges_from(G.selfloop_edges())

    os.makedirs(output[0])

    fout=None
    for i,e in enumerate(G.edges()):
        if (i % int(params.N)) ==0:
            n= int(i // params.N )+1
            if fout is not None: fout.close()
            fout= open(f"{output[0]}/subset_{n}.txt",'w')

        fout.write("\t".join(sorted(e))+'\n')
SnakeMake From line 147 of rules/sketch.smk
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import pandas as pd

import scipy.spatial as sp
import scipy.cluster.hierarchy as hc
from sklearn.metrics import silhouette_score
import numpy as np
from common import genome_pdist as gd
import networkx as nx


def automatic_cluster_species(
    Dist, seed_thresholds=[0.92, 0.97], linkage_method="average"
):

    linkage = hc.linkage(sp.distance.squareform(Dist), method=linkage_method)

    def get_Nclusters(threshold):
        labels = hc.fcluster(linkage, (1 - threshold), criterion="distance")
        return max(labels)

    N_range = [get_Nclusters(t) for t in seed_thresholds]

    if (N_range[1] - N_range[0]) > 100:
        print(f"Need to evaluate more than {N_range[1]-N_range[0]} thresholds")

    assert ~np.isnan(N_range).any(), "N range is not defined"

    Scores = gd.evaluate_clusters_range(
        np.arange(min(N_range), max(N_range) + 1), Dist, linkage_method=linkage_method
    )

    if N_range[0] == N_range[1]:
        labels = hc.fcluster(linkage, (1 - seed_thresholds[0]), criterion="distance")
    else:

        N_species = Scores.Silhouette_score.idxmax()
        labels = hc.fcluster(linkage, N_species, criterion="maxclust")

    return Scores, labels


def threshold_based_clustering(Dist, threshold, linkage_method="average"):
    assert (threshold > 0.9) & (
        threshold < 1
    ), "threshold should be between 0.9 and 1 or 'auto', threshold was {threshold}"
    linkage = hc.linkage(sp.distance.squareform(Dist), method=linkage_method)
    labels = hc.fcluster(linkage, (1 - threshold), criterion="distance")
    Scores = gd.evaluate_clusters_thresholds(
        [threshold], Dist, linkage_method=linkage_method
    )
    return Scores, labels


if __name__ == "__main__":

    linkage_method = snakemake.params.linkage_method
    threshold = snakemake.params.threshold
    quality_score_formula = snakemake.config["quality_score"]

    Q = pd.read_csv(snakemake.input.genome_info,sep='\t',index_col=0)
    quality_score = Q.eval(quality_score_formula)

    assert (
        not quality_score.isnull().any()
    ), "I have NA quality values for thq quality score, it seems not all of the values defined in the quality_score_formula are presentfor all entries in tables/Genome_quality.tsv "



    # load genome distances
    M= gd.load_parquet(snakemake.input[0])

    # genome distance to graph if ANI > 0.9
    G= gd.to_graph(M.query(f"Identity>=@threshold"))
    if hasattr(G,'selfloop_edges'):
        G.remove_edges_from(G.selfloop_edges())


    # prepare table for species number
    mag2Species = pd.DataFrame(index=Q.index, columns=["SpeciesNr", "Species"])
    mag2Species.index.name = "genome"

    last_species_nr=0

    for cc in nx.connected_components(G):


        Mcc = M.loc[( M.index.levels[0].intersection(cc),
                     M.index.levels[1].intersection(cc)
                     ),
                ]

        Dist = 1 - gd.pairewise2matrix(Mcc, fillna=Mcc.Identity.min())

        if threshold == "auto":
            Scores, labels = automatic_cluster_species(Dist, linkage_method=linkage_method)
        else:
            Scores, labels = threshold_based_clustering(
                Dist, threshold, linkage_method=linkage_method
            )

        # enter values of labels to species table
        mag2Species.loc[Dist.index, "SpeciesNr"] = labels +last_species_nr
        last_species_nr = mag2Species.SpeciesNr.max()


    missing_species = mag2Species.SpeciesNr.isnull()
    N_missing_species = sum(missing_species)
    mag2Species.loc[missing_species, "SpeciesNr"] = (
        np.arange(last_species_nr, last_species_nr + N_missing_species) + 1
    )

    Scores["N_clusters"] += N_missing_species
    Scores.to_csv(snakemake.output.scores, sep="\t", index=False)

    # assert (
    #     Scores.loc[Scores.Silhouette_score.idxmax(), "N_clusters"]
    #     == mag2Species.SpeciesNr.max()
    # ), "error in calculation of N species"
    print(f"Identified { mag2Species.SpeciesNr.max()} species")

    # create propper species names
    n_leading_zeros = len(str(max(labels)))
    format_int = "sp{:0" + str(n_leading_zeros) + "d}"
    mag2Species["Species"] = mag2Species.SpeciesNr.apply(format_int.format)

    # select representative
    mag2Species["Representative_Species"] = gd.best_genome_from_table(
        mag2Species.Species, quality_score
    )

    mag2Species.to_csv(snakemake.output.cluster_file, sep="\t")

    Q= Q.join(mag2Species)
    Q.to_csv(snakemake.input.genome_info,sep='\t')
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import pandas as pd

import scipy.spatial as sp
import scipy.cluster.hierarchy as hc

import numpy as np
from common import genome_pdist as gd




if __name__ == "__main__":

    linkage_method = snakemake.params.linkage_method
    treshold = 0.995
    quality_score_formula = snakemake.config["quality_score"]



    M = gd.load_parquet(snakemake.input.dists)

    mag2species = pd.read_csv(snakemake.input.mag2species, index_col=0, sep="\t")
    Strains = mag2species.copy()

    M["Species"] = mag2species.loc[M.index.get_level_values(0), "Species"].values
    M["Species2"] = mag2species.loc[M.index.get_level_values(1), "Species"].values
    M = M.query("Species==Species2")



    strains_threshold= snakemake.config.get("strains_threshold",'auto')

    # simple threshold based spliting
    if strains_threshold != "auto":

        for species, data in M.groupby("Species"):

            ID = data.Identity.unstack()

            all_indexes = ID.index.union(ID.columns)
            ID = ID.reindex(index=all_indexes, columns=all_indexes).fillna(0)
            ID = ID + ID.T
            ID.values[np.eye(ID.shape[0], dtype=bool)] = 1
            Dist = 1 - ID.fillna(0.8)
            Dist.clip(0, 1, inplace=True)
            linkage = hc.linkage(sp.distance.squareform(Dist), method=linkage_method)

            labels = hc.fcluster(linkage, (1 - float(strains_threshold)) / 2, criterion="distance")

            Strains.loc[ID.index, "StrainNr"] = labels


    # Detect automatically threshold
    else:

        # detect stain gap
        fraction_below_treshold = (M.Identity < treshold).groupby(
            M.Species
        ).sum() / M.groupby("Species").size()
        have_strain_gap = fraction_below_treshold > 0.05



        for species, data in M.groupby("Species"):

            if have_strain_gap[species]:

                ID = data.Identity.unstack()

                all_indexes = ID.index.union(ID.columns)
                ID = ID.reindex(index=all_indexes, columns=all_indexes).fillna(0)
                ID = ID + ID.T
                ID.values[np.eye(ID.shape[0], dtype=bool)] = 1
                Dist = 1 - ID.fillna(0.8)
                Dist.clip(0, 1, inplace=True)
                linkage = hc.linkage(sp.distance.squareform(Dist), method=linkage_method)

                labels = hc.fcluster(linkage, (1 - treshold) / 2, criterion="distance")

                Nmax = max(labels)

                assert Nmax < 500, "Need to evaluate more than 500 tresholds"

                assert ~np.isnan(Nmax), "N range is not defined"

                Scores = gd.evaluate_clusters_range(
                    np.arange(2, Nmax + 1), Dist, linkage_method=linkage_method
                )

                N_strains = Scores.Silhouette_score.idxmax()

                if ~np.isnan(N_strains):
                    labels = hc.fcluster(linkage, N_strains, criterion="maxclust")

                Strains.loc[ID.index, "StrainNr"] = labels


    # Finalize naming

    Strains["Strain"] = (
        "Strain"
        + Strains.SpeciesNr.astype(int).astype("str")
        + "_"
        + Strains.StrainNr.dropna().astype(int).astype("str")
    )
    Strains.loc[Strains.Strain.isnull(), "Strain"] = Strains.loc[
        Strains.Strain.isnull(), "Species"
    ]

    print(f"Identified { Strains.Strain.unique().size} strains")

    Q = pd.read_csv(snakemake.input.genome_info,sep='\t',index_col=0)
    quality_score = Q.eval(quality_score_formula)

    Strains["Representative_Strain"] = gd.best_genome_from_table(
        Strains.Strain, quality_score
    )

    Strains.to_csv(snakemake.output.mag2strain, sep="\t")


    for col in ["Strain","Representative_Strain"]:
        Q[col] = Strains[col]

    Q.to_csv(snakemake.input.genome_info,sep='\t')
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import sys, os
import pandas as pd

sys.stdout = open(snakemake.log[0], "w")
sys.stderr = open(snakemake.log[0], "a")

from snakemake.shell import shell
from common.alignments import parse_paf_files


def run_minimap(
    ref,
    query,
    paf,
    options=snakemake.params.minimap_extra,
    threads=snakemake.threads,
    log=snakemake.log[0],
):

    os.makedirs(os.path.dirname(paf), exist_ok=True)

    shell(
        "minimap2 {options} -t {threads} {ref} {query}  > {paf}.tmp 2> {log} ;"
        "mv {paf}.tmp {paf} 2> {log}"
    )


def many_minimap(
    alignment_list, genome_folder, paf_folder, genome_stats_file, extension=".fasta"
):

    stats = pd.read_csv(genome_stats_file, index_col=0, sep="\t")

    os.makedirs(paf_folder, exist_ok=True)

    paf_files = []

    with open(alignment_list) as f:
        for line in f:

            # get larger genome as ref and smaler as query
            genome_pair = line.strip().split()
            genome_query, genome_ref = (
                stats.loc[genome_pair, "Length"].sort_values().index
            )

            paf_file = os.path.join(paf_folder, genome_ref, genome_query + ".paf")

            if not os.path.exists(paf_file):

                # run minimap
                fasta_ref = os.path.join(genome_folder, genome_ref + extension)
                fasta_query = os.path.join(genome_folder, genome_query + extension)
                run_minimap(fasta_ref, fasta_query, paf_file)

            paf_files.append(paf_file)

    return paf_files


paf_files = many_minimap(
    snakemake.input.alignment_list,
    snakemake.input.genome_folder,
    snakemake.params.paf_folder,
    snakemake.input.genome_stats,
    snakemake.params.extension,
)


parse_paf_files(
    paf_files, snakemake.input.genome_stats, snakemake.output.alignments_stats
)
ShowHide 14 more snippets with no or duplicated tags.

Login to post a comment if you would like to share your experience with this workflow.

Do you know this workflow well? If so, you can request seller status , and start supporting this workflow.

Free

Created: 1yr ago
Updated: 1yr ago
Maitainers: public
URL: https://github.com/SilasK/FastDrep
Name: fastdrep
Version: 0.1
Badge:
workflow icon

Insert copied code into your website to add a link to this workflow.

Downloaded: 0
Copyright: Public Domain
License: MIT License
  • Future updates

Related Workflows

cellranger-snakemake-gke
snakemake workflow to run cellranger on a given bucket using gke.
A Snakemake workflow for running cellranger on a given bucket using Google Kubernetes Engine. The usage of this workflow ...