Snakemake Workflow for Assigning New MAGs to Predefined Groups Based on Whole-Genome Similarity
Help improve this workflow!
This workflow has been published but could be further improved with some additional meta data:- Keyword(s) in categories input, output, operation
You can help improve this workflow by suggesting the addition or removal of keywords, suggest changes and report issues, or request to become a maintainer of the Workflow .
A Snakemake workflow for assigning new MAGs to already-defined groups based on whole-genome similarity.
I suggest using this filtering workflow before running this one.
Code Snippets
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 | import os, glob from turtle import distance import pandas as pd from collections import defaultdict from multiprocessing import Pool from snakemake.shell import shell import logging, traceback logging.basicConfig( filename=snakemake.log[0], level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logging.captureWarnings(True) def handle_exception(exc_type, exc_value, exc_traceback): if issubclass(exc_type, KeyboardInterrupt): sys.__excepthook__(exc_type, exc_value, exc_traceback) return logging.error( "".join( [ "Uncaught exception: ", *traceback.format_exception(exc_type, exc_value, exc_traceback), ] ) ) # Install exception handler sys.excepthook = handle_exception # Start script logger.info('Starting preparing input data!') distance_df = pd.read_csv(snakemake.input.distance,sep='\t') distance_df['hash_found'] = distance_df['hash_ratio'].str.split("/",expand=True)[0].astype('int') distance_df['hash_total'] = distance_df['hash_ratio'].str.split("/",expand=True)[1].astype('int') distance_df['hash_ratio'] = distance_df['hash_found']/distance_df['hash_total'] group_def = pd.read_csv(snakemake.config['group_def'],sep='\t') group_def.subspecies = group_def.subspecies.astype('str').str.rjust(10,'0') distance_df['target_genome'] = distance_df['target'].str.rsplit("/",1,expand=True)[1].str.rsplit(".",2,expand=True)[0] distance_df = distance_df.merge(group_def,left_on='target_genome',right_on='genome',how='left') logger.info('Finished preparing input data!') output_df = pd.DataFrame() short_df = pd.DataFrame() def assign_group(query_MAG): short_output_dict = defaultdict() temp_df = distance_df.loc[distance_df['query'] == query_MAG] subsp_dist = defaultdict() for subsp in temp_df.subspecies.unique(): df = temp_df.loc[temp_df['subspecies'] == subsp] df = df.loc[df['hash_found'] > int(snakemake.config['min_hash'])] if df.shape[0] > 0: subsp_dist[subsp] = df['dist'].median() dist_df = pd.DataFrame.from_dict(subsp_dist,orient='index') dist_df.columns = ['median_dist'] dist_df = dist_df.reset_index() dist_df = dist_df.rename(columns={'index':'subsp'}) dist_df['query'] = query_MAG chosen_subsp = dist_df.sort_values(by='median_dist',ascending=True).iloc[0]['subsp'] short_output_dict[query_MAG] = chosen_subsp short_output_df = pd.DataFrame.from_dict(short_output_dict,orient='index') short_output_df.columns = ['subsp'] short_output_df = short_output_df.reset_index() short_output_df = short_output_df.rename(columns={'index':'query'}) short_output_df['median_dist'] = dist_df.loc[dist_df['subsp'] == chosen_subsp]['median_dist'].iloc[0] logger.info(f"Finished assigning group for {query_MAG}") return dist_df, short_output_df all_queries = distance_df['query'].unique().tolist() logger.info('Starting assigning groups!') for query in all_queries: dist_df, short_output_df = assign_group(query) output_df = pd.concat([output_df,dist_df]) short_df = pd.concat([short_df, short_output_df]) cols = list(output_df.columns) cols = [cols[-1]] + cols[:-1] output_df = output_df[cols] output_df.to_csv(snakemake.output.full_output,sep='\t',index=False) short_df.to_csv(snakemake.output.short_output, sep='\t', index=False) logger.info('Finished assigning groups!') |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | import os, glob import pandas as pd from collections import defaultdict from snakemake.shell import shell import logging, traceback logging.basicConfig( filename=snakemake.log[0], level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logging.captureWarnings(True) def handle_exception(exc_type, exc_value, exc_traceback): if issubclass(exc_type, KeyboardInterrupt): sys.__excepthook__(exc_type, exc_value, exc_traceback) return logging.error( "".join( [ "Uncaught exception: ", *traceback.format_exception(exc_type, exc_value, exc_traceback), ] ) ) # Install exception handler sys.excepthook = handle_exception # Start script output_df = pd.DataFrame() for distance_df in snakemake.input.all_input: MAG = distance_df.split("/")[-1].split(".")[0] if os.path.getsize(distance_df) > 0: df = pd.read_csv(distance_df,sep='\t',names=['query','target','dist','p_value','hash_ratio'],header=None) output_df = pd.concat([output_df,df],axis=0) else: logger.warning(f"Table for MAG {MAG} is empty!") output_df.to_csv(snakemake.output.distance_df,sep='\t',index=False) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | import os import pandas as pd from snakemake.shell import shell import logging, traceback logging.basicConfig( filename=snakemake.log[0], level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logging.captureWarnings(True) def handle_exception(exc_type, exc_value, exc_traceback): if issubclass(exc_type, KeyboardInterrupt): sys.__excepthook__(exc_type, exc_value, exc_traceback) return logging.error( "".join( [ "Uncaught exception: ", *traceback.format_exception(exc_type, exc_value, exc_traceback), ] ) ) # Install exception handler sys.excepthook = handle_exception # Start script os.makedirs(snakemake.output.batch_dir, exist_ok=True) paths = snakemake.config["MAGS_path_table"] paths_df = pd.read_csv(paths, sep="\t")[['path','group']] # Split based on batch_size in random batches groups = paths_df.group.unique() for group in groups: family_df = paths_df.loc[paths_df["group"] == group] names = ( family_df.path.str.rsplit("/", 1, expand=True)[1] .str.rsplit(".", 2, expand=True)[0] .values.tolist() ) output_df = pd.DataFrame() output_df["F"] = family_df["path"].values.tolist() group = group.lstrip("f__") output_df.to_csv(f"{snakemake.output.batch_dir}/{group}.tsv", sep='\t',index=False,header=False) logging.info(f"Created batch file {snakemake.output.batch_dir}/{group}.tsv") |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | import os import pandas as pd from snakemake.shell import shell import logging, traceback logging.basicConfig( filename=snakemake.log[0], level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logging.captureWarnings(True) def handle_exception(exc_type, exc_value, exc_traceback): if issubclass(exc_type, KeyboardInterrupt): sys.__excepthook__(exc_type, exc_value, exc_traceback) return logging.error( "".join( [ "Uncaught exception: ", *traceback.format_exception(exc_type, exc_value, exc_traceback), ] ) ) # Install exception handler sys.excepthook = handle_exception # Start script os.makedirs(snakemake.output.query_dir, exist_ok=True) df_path = snakemake.config['query_paths'] df = pd.read_csv(df_path,sep='\t') for MAG in df.Genome: temp_df = df.query("Genome == @MAG") path = temp_df['genome_file'].iloc[0] if os.path.exists(path): temp_df.to_csv(f"{snakemake.output.query_dir}/{MAG}.tsv", sep='\t',index=False) else: logger.warning(f"Path not found for {path}") |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 | import os, glob import pandas as pd from snakemake.shell import shell import logging, traceback logging.basicConfig( filename=snakemake.log[0], level=logging.INFO, format="%(asctime)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) logging.captureWarnings(True) def handle_exception(exc_type, exc_value, exc_traceback): if issubclass(exc_type, KeyboardInterrupt): sys.__excepthook__(exc_type, exc_value, exc_traceback) return logging.error( "".join( [ "Uncaught exception: ", *traceback.format_exception(exc_type, exc_value, exc_traceback), ] ) ) # Install exception handler sys.excepthook = handle_exception # Start script df= pd.read_csv(snakemake.input.query_MAG,sep='\t') path = df.iloc[0]['genome_file'] group = df.iloc[0]['group'] if f"{group}.sketch" in os.listdir("output/reference_sketches"): shell(f"bindash sketch --nthreads={snakemake.threads} --kmerlen={snakemake.params.kmer_len} --outfname={snakemake.resources.tmpdir}/{snakemake.wildcards.MAG}.sketch {path} &> {snakemake.log}") shell(f"bindash dist --nthreads={snakemake.threads} --outfname={snakemake.output.result} {snakemake.resources.tmpdir}/{snakemake.wildcards.MAG}.sketch output/reference_sketches/{group}.sketch 2>> {snakemake.log}") else: logger.warning("Group was not found in reference! Skipping it . . .") shell(f"touch {snakemake.output.result}") |
18 19 | script: "scripts/create_batch_file.py" |
37 38 | shell: "bindash sketch --nthreads={threads} --kmerlen={params.kmer_len} --listfname={input.MAG_list} --outfname={output.sketch} &> {log}" |
63 64 | script: "scripts/query_MAGs.py" |
82 83 | script: "scripts/sketch_query.py" |
104 105 | script: "scripts/concat_queries.py" |
122 123 | script: "scripts/assign_group.py" |
Support
Do you know this workflow well? If so, you can
request seller status , and start supporting this workflow.
Created: 1yr ago
Updated: 1yr ago
Maitainers:
public
URL:
https://github.com/trickovicmatija/assign-MAGs
Name:
assign-mags
Version:
1
Downloaded:
0
Copyright:
Public Domain
License:
MIT License
Keywords:
- Future updates
Related Workflows

ENCODE pipeline for histone marks developed for the psychENCODE project
psychip pipeline is an improved version of the ENCODE pipeline for histone marks developed for the psychENCODE project.
The o...

Near-real time tracking of SARS-CoV-2 in Connecticut
Repository containing scripts to perform near-real time tracking of SARS-CoV-2 in Connecticut using genomic data. This pipeli...

snakemake workflow to run cellranger on a given bucket using gke.
A Snakemake workflow for running cellranger on a given bucket using Google Kubernetes Engine. The usage of this workflow ...

ATLAS - Three commands to start analyzing your metagenome data
Metagenome-atlas is a easy-to-use metagenomic pipeline based on snakemake. It handles all steps from QC, Assembly, Binning, t...
raw sequence reads
Genome assembly
Annotation track
checkm2
gunc
prodigal
snakemake-wrapper-utils
MEGAHIT
Atlas
BBMap
Biopython
BioRuby
Bwa-mem2
cd-hit
CheckM
DAS
Diamond
eggNOG-mapper v2
MetaBAT 2
Minimap2
MMseqs
MultiQC
Pandas
Picard
pyfastx
SAMtools
SemiBin
Snakemake
SPAdes
SqueezeMeta
TADpole
VAMB
CONCOCT
ete3
gtdbtk
h5py
networkx
numpy
plotly
psutil
utils
metagenomics

RNA-seq workflow using STAR and DESeq2
This workflow performs a differential gene expression analysis with STAR and Deseq2. The usage of this workflow is described ...

This Snakemake pipeline implements the GATK best-practices workflow
This Snakemake pipeline implements the GATK best-practices workflow for calling small germline variants. The usage of thi...