Snakemake Workflow for Assigning New MAGs to Predefined Groups Based on Whole-Genome Similarity

public public 1yr ago 0 bookmarks

A Snakemake workflow for assigning new MAGs to already-defined groups based on whole-genome similarity.

I suggest using this filtering workflow before running this one.

Code Snippets

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
import os, glob
from turtle import distance
import pandas as pd
from collections import defaultdict
from multiprocessing import Pool


from snakemake.shell import shell
import logging, traceback

logging.basicConfig(
    filename=snakemake.log[0],
    level=logging.INFO,
    format="%(asctime)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)

logging.captureWarnings(True)


def handle_exception(exc_type, exc_value, exc_traceback):
    if issubclass(exc_type, KeyboardInterrupt):
        sys.__excepthook__(exc_type, exc_value, exc_traceback)
        return

    logging.error(
        "".join(
            [
                "Uncaught exception: ",
                *traceback.format_exception(exc_type, exc_value, exc_traceback),
            ]
        )
    )


# Install exception handler
sys.excepthook = handle_exception

# Start script

logger.info('Starting preparing input data!')

distance_df = pd.read_csv(snakemake.input.distance,sep='\t')

distance_df['hash_found'] = distance_df['hash_ratio'].str.split("/",expand=True)[0].astype('int')
distance_df['hash_total'] = distance_df['hash_ratio'].str.split("/",expand=True)[1].astype('int')
distance_df['hash_ratio'] = distance_df['hash_found']/distance_df['hash_total']

group_def = pd.read_csv(snakemake.config['group_def'],sep='\t')
group_def.subspecies = group_def.subspecies.astype('str').str.rjust(10,'0')

distance_df['target_genome'] = distance_df['target'].str.rsplit("/",1,expand=True)[1].str.rsplit(".",2,expand=True)[0]

distance_df = distance_df.merge(group_def,left_on='target_genome',right_on='genome',how='left')
logger.info('Finished preparing input data!')

output_df = pd.DataFrame()
short_df = pd.DataFrame()

def assign_group(query_MAG):
    short_output_dict = defaultdict()
    temp_df = distance_df.loc[distance_df['query'] == query_MAG]  
    subsp_dist = defaultdict()
    for subsp in temp_df.subspecies.unique():
        df = temp_df.loc[temp_df['subspecies'] == subsp]
        df = df.loc[df['hash_found'] > int(snakemake.config['min_hash'])]
        if df.shape[0] > 0:
            subsp_dist[subsp] = df['dist'].median()
    dist_df = pd.DataFrame.from_dict(subsp_dist,orient='index')
    dist_df.columns = ['median_dist']
    dist_df = dist_df.reset_index()
    dist_df = dist_df.rename(columns={'index':'subsp'})
    dist_df['query'] = query_MAG

    chosen_subsp = dist_df.sort_values(by='median_dist',ascending=True).iloc[0]['subsp']
    short_output_dict[query_MAG] = chosen_subsp
    short_output_df = pd.DataFrame.from_dict(short_output_dict,orient='index')
    short_output_df.columns = ['subsp']
    short_output_df = short_output_df.reset_index()
    short_output_df = short_output_df.rename(columns={'index':'query'})
    short_output_df['median_dist'] = dist_df.loc[dist_df['subsp'] == chosen_subsp]['median_dist'].iloc[0]

    logger.info(f"Finished assigning group for {query_MAG}")

    return dist_df, short_output_df

all_queries = distance_df['query'].unique().tolist()
logger.info('Starting assigning groups!')

for query in all_queries:
    dist_df, short_output_df = assign_group(query)
    output_df = pd.concat([output_df,dist_df])
    short_df = pd.concat([short_df, short_output_df])

cols = list(output_df.columns)
cols = [cols[-1]] + cols[:-1]
output_df = output_df[cols]

output_df.to_csv(snakemake.output.full_output,sep='\t',index=False)

short_df.to_csv(snakemake.output.short_output, sep='\t', index=False)

logger.info('Finished assigning groups!')
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import os, glob
import pandas as pd
from collections import defaultdict

from snakemake.shell import shell
import logging, traceback

logging.basicConfig(
    filename=snakemake.log[0],
    level=logging.INFO,
    format="%(asctime)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)

logging.captureWarnings(True)


def handle_exception(exc_type, exc_value, exc_traceback):
    if issubclass(exc_type, KeyboardInterrupt):
        sys.__excepthook__(exc_type, exc_value, exc_traceback)
        return

    logging.error(
        "".join(
            [
                "Uncaught exception: ",
                *traceback.format_exception(exc_type, exc_value, exc_traceback),
            ]
        )
    )


# Install exception handler
sys.excepthook = handle_exception

# Start script
output_df = pd.DataFrame()

for distance_df in snakemake.input.all_input:
    MAG = distance_df.split("/")[-1].split(".")[0]
    if os.path.getsize(distance_df) > 0:
        df = pd.read_csv(distance_df,sep='\t',names=['query','target','dist','p_value','hash_ratio'],header=None)
        output_df = pd.concat([output_df,df],axis=0)
    else:
        logger.warning(f"Table for MAG {MAG} is empty!")

output_df.to_csv(snakemake.output.distance_df,sep='\t',index=False)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import os
import pandas as pd

from snakemake.shell import shell
import logging, traceback

logging.basicConfig(
    filename=snakemake.log[0],
    level=logging.INFO,
    format="%(asctime)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)

logging.captureWarnings(True)


def handle_exception(exc_type, exc_value, exc_traceback):
    if issubclass(exc_type, KeyboardInterrupt):
        sys.__excepthook__(exc_type, exc_value, exc_traceback)
        return

    logging.error(
        "".join(
            [
                "Uncaught exception: ",
                *traceback.format_exception(exc_type, exc_value, exc_traceback),
            ]
        )
    )


# Install exception handler
sys.excepthook = handle_exception

# Start script
os.makedirs(snakemake.output.batch_dir, exist_ok=True)

paths = snakemake.config["MAGS_path_table"]

paths_df = pd.read_csv(paths, sep="\t")[['path','group']]


# Split based on batch_size in random batches

groups = paths_df.group.unique()
for group in groups:
    family_df = paths_df.loc[paths_df["group"] == group]
    names = (
        family_df.path.str.rsplit("/", 1, expand=True)[1]
        .str.rsplit(".", 2, expand=True)[0]
        .values.tolist()
    )
    output_df = pd.DataFrame()
    output_df["F"] = family_df["path"].values.tolist()
    group = group.lstrip("f__")
    output_df.to_csv(f"{snakemake.output.batch_dir}/{group}.tsv", sep='\t',index=False,header=False)
    logging.info(f"Created batch file {snakemake.output.batch_dir}/{group}.tsv")
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import os
import pandas as pd

from snakemake.shell import shell
import logging, traceback

logging.basicConfig(
    filename=snakemake.log[0],
    level=logging.INFO,
    format="%(asctime)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)

logging.captureWarnings(True)


def handle_exception(exc_type, exc_value, exc_traceback):
    if issubclass(exc_type, KeyboardInterrupt):
        sys.__excepthook__(exc_type, exc_value, exc_traceback)
        return

    logging.error(
        "".join(
            [
                "Uncaught exception: ",
                *traceback.format_exception(exc_type, exc_value, exc_traceback),
            ]
        )
    )


# Install exception handler
sys.excepthook = handle_exception

# Start script
os.makedirs(snakemake.output.query_dir, exist_ok=True)

df_path = snakemake.config['query_paths']
df = pd.read_csv(df_path,sep='\t')

for MAG in df.Genome:
    temp_df = df.query("Genome == @MAG")
    path = temp_df['genome_file'].iloc[0]
    if os.path.exists(path):
        temp_df.to_csv(f"{snakemake.output.query_dir}/{MAG}.tsv", sep='\t',index=False)
    else:
        logger.warning(f"Path not found for {path}")
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import os, glob
import pandas as pd

from snakemake.shell import shell
import logging, traceback

logging.basicConfig(
    filename=snakemake.log[0],
    level=logging.INFO,
    format="%(asctime)s %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)

logging.captureWarnings(True)


def handle_exception(exc_type, exc_value, exc_traceback):
    if issubclass(exc_type, KeyboardInterrupt):
        sys.__excepthook__(exc_type, exc_value, exc_traceback)
        return

    logging.error(
        "".join(
            [
                "Uncaught exception: ",
                *traceback.format_exception(exc_type, exc_value, exc_traceback),
            ]
        )
    )


# Install exception handler
sys.excepthook = handle_exception

# Start script

df= pd.read_csv(snakemake.input.query_MAG,sep='\t')

path = df.iloc[0]['genome_file']
group = df.iloc[0]['group']


if f"{group}.sketch" in os.listdir("output/reference_sketches"):

    shell(f"bindash sketch --nthreads={snakemake.threads} --kmerlen={snakemake.params.kmer_len} --outfname={snakemake.resources.tmpdir}/{snakemake.wildcards.MAG}.sketch {path} &> {snakemake.log}")
    shell(f"bindash dist --nthreads={snakemake.threads} --outfname={snakemake.output.result} {snakemake.resources.tmpdir}/{snakemake.wildcards.MAG}.sketch output/reference_sketches/{group}.sketch 2>> {snakemake.log}")

else:
    logger.warning("Group was not found in reference! Skipping it . . .")
    shell(f"touch {snakemake.output.result}")
18
19
script:
    "scripts/create_batch_file.py"
37
38
shell:
    "bindash sketch --nthreads={threads} --kmerlen={params.kmer_len} --listfname={input.MAG_list} --outfname={output.sketch} &> {log}"
63
64
script:
    "scripts/query_MAGs.py"
82
83
script:
    "scripts/sketch_query.py"
104
105
script:
    "scripts/concat_queries.py"
122
123
script:
    "scripts/assign_group.py"
ShowHide 10 more snippets with no or duplicated tags.

Login to post a comment if you would like to share your experience with this workflow.

Do you know this workflow well? If so, you can request seller status , and start supporting this workflow.

Free

Created: 1yr ago
Updated: 1yr ago
Maitainers: public
URL: https://github.com/trickovicmatija/assign-MAGs
Name: assign-mags
Version: 1
Badge:
workflow icon

Insert copied code into your website to add a link to this workflow.

Downloaded: 0
Copyright: Public Domain
License: MIT License
  • Future updates

Related Workflows

cellranger-snakemake-gke
snakemake workflow to run cellranger on a given bucket using gke.
A Snakemake workflow for running cellranger on a given bucket using Google Kubernetes Engine. The usage of this workflow ...