Snakemake workflow to map and quantify metatranscriptomic data using a metagenome reference

public public 1yr ago Version: v0.1.1 0 bookmarks

A Snakemake workflow for mapping metatranscriptomic reads against a metagenomic reference.

Overview

In short, the workflow maps paired-end input reads against assemblies and performs read counting and normalization both per ORF and per annotation features.

Usage

Required files are:

  1. Assembled fasta files named final_contigs.fa and GFF file named final_contigs.gff under each assembly subdirectory in assembly_dir specified in the config file. Each assembly to analyse should be specified in a text file given by assembly_list in the config file.

  2. Paths to paired-end fastq files (preprocessed as necessary) specified in a tab-delimited text file (given by sample_list in the config file).

  3. An annotation directory for each assembly (given by annotation_dir ) where each assembly has its own subdirectory containing tab-delimited annotation files in the form of <annotation_dir>/<assembly>/<db>.parsed.tsv where db can be for instance 'pfam', 'enzymes', 'kos', 'modules', 'pathways' etc.

Example setup

A typical yaml-format configuration file, by default located at config/config.yml may look like the following.

sample_list: "samples.tsv"
assembly_list: "assemblies.tsv"
assembly_dir: "data/assembly"
annotation_dir: "data/annotation"
featurecounts:
 settings: "-t CDS -g ID -M -Q 10 -B -p"
bowtie2:
 settings: "--very-sensitive"
annotation_dbs:
 - "pfam"
 - "protein-models"
 - "enzymes"
 - "kos"
 - "modules"
 - "pathways"
norm_methods:
 - "CSS"
 - "TMM"
 - "RLE"

An example of what the samples.tsv file can look like is:

sample unit fq1 fq2
sample1 1 /proj/data/sample1_R1.fastq.gz /proj/data/sample1_R2.fastq.gz
sample2 1 /proj/data/sample2_R1.fastq.gz /proj/data/sample2_R2.fastq.gz

The file assemblies.tsv lists what assemblies to analyse and only contains each assembly name, one per row:

assembly
assembly1
assembly2

Each assembly, here assembly1 and assembly2 must have dedicated subdirectories under the path given by parameter assembly_dir , in this case data/assembly/ meaning that there should exist:

| data/assembly/assembly1
└── final_contigs.fa
data/assembly/assembly2
└── final_contigs.fa

in order for the workflow to work.

Furthermore, in order for the workflow to be able to sum transcripts to the level of annotated features, such features must be supplied for the metagenomic assembly under the path given by annotation_dir . These annotation files should be in tab-delimited format and contain the open reading frame in the first column, with subsequent columns dedicated to feature IDs and/or descriptions. As an example, for output from running pfam_scan on protein sequences against the PFAM database the output, found in file pfam.parsed.tsv may look like this:

orf pfam pfam_name clan clan_name
k141_100000_1 PF01546 Peptidase family M20/M25/M40 CL0035 Peptidase clan MH/MC/MF
k141_100003_2 PF01368 DHH CL0137 HAD superfamily

With this setup, metatranscriptomic reads from each sample will be mapped to each metagenomic assembly using bowtie2 , and reads assigned to each orf counted using featureCounts from the subread package. These read counts are then summed to the level of pfam id and normalized.

Exmple command

To run the workflow with the above setup, use the following command:

snakemake --use-conda --configfile config/config.yml -j 100 -rpk --profile slurm

The --profile slurm flag instructs the workflow to submit jobs to the SLURM workload manager, using runtime and threads according to the specific rule settings. The only thing you need to change for this to work is to update the account: setting in the config/cluster.yaml file with your account id ( e.g. snicXXXX-Y-ZZ).

__default__:
 account: staff

Output

Using the setup above, expected output for assembly1 is:

| results/assembly1
├── count ├── sample1_1.fc.tsv ├── sample1_1.fc.tsv.summary ├── sample2_1.fc.tsv ├── sample2_1.fc.tsv.summary
├── counts.tsv
├── pfam.parsed.counts.tsv
├── pfam.parsed.CSS.tsv
├── pfam.parsed.RLE.tsv
├── pfam.parsed.TMM.tsv
├── report.html
└── rpkm.tsv

Under the count/ subdirectory the raw output from featureCounts is located: count/sample1_1.fc.tsv etc.

The file counts.tsv contains assigned read counts for each orf in each mapped sample while rpkm.tsv has read counts normalied to reads per kilobase million. The report.html file has summarized results from multiqc on mapped and assigned read metrics.

All files starting with pfam. contain either raw ( pfam.parsed.counts.tsv ) or normalized read counts for annotated pfams.

Collated output

Collated tables of metatranscriptomic counts and normalized values will be output to tables/ . However, only samples with matching metagenomic assemblies listed in the assembly_list file, and with a corresponding final_contigs.fa under a subdirectory of assembly_dir will have values in these files as they only show abundances for features where a metatranscriptomic sample could be linked directly to its metagenomic assembly.

As an example, in a set up where:

  • the config file contains:
assembly_dir: "data/assemblies"
assembly_list: "assemblies.txt"
sample_list: "samples.tsv"
  • a metatranscriptomic sample named sample1 is specified in samples.tsv , and

  • there is a metagenomic assembly at data/assemblies/sample1/final_contigs.fa

then sample1 will also have values in collated files under tables/ . For a metatranscriptomic sample, e.g. sample2 listed in samples.tsv but without a corresponding metagenomic assembly, counts and normalized values for features in the other existing assemblies will be found under e.g results/sample1/kos.parsed.rpkm.tsv etc.

Code Snippets

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import pandas as pd
import os
from collections import defaultdict


def parse_samples(f):
    df = pd.read_csv(f, sep="\t", index_col=0)
    samples = defaultdict(lambda: defaultdict(dict))
    for sample, d in df.iterrows():
        sample_id = f"{sample}_{d['unit']}"
        samples[sample_id]["fq1"] = d["fq1"]
        samples[sample_id]["fq2"] = d["fq2"]
        # If no assembly is set in the samples file, assign assembly
        # using sample
        try:
            samples[sample_id]["assembly"] = d["assembly"]
        except KeyError:
            samples[sample_id]["assembly"] = sample
    return samples


def parse_assemblies(f, assembly_dir):
    df = pd.read_csv(f, sep="\t", index_col=0, header=None)
    d = df.to_dict(orient="index")
    assemblies = {}
    for assembly in d.keys():
        if os.path.exists(f"{assembly_dir}/{assembly}/final_contigs.fa"):
            assemblies[assembly] = ""
    return assemblies


def clean_featurecount(sm):
    import os
    dataf = pd.DataFrame()
    for f in sm.input.tsv:
        sample = os.path.basename(f).replace(".fc.tsv", "")
        df = pd.read_csv(f, sep="\t", comment="#", usecols=[0, 1, 5, 6])
        df.columns = ["Geneid", "Chr", "Length", sample]
        df.index = df.Chr.map(str) + ["_" + x.split("_")[-1] for x in df.Geneid]
        df.drop(["Geneid","Chr"], axis=1, inplace=True)
        dataf = pd.merge(dataf, df, left_index=True, right_index=True, how="outer")
        try:
            dataf.drop("Length_y", axis=1, inplace=True)
        except KeyError:
            continue
        dataf.rename(columns={"Length_x": "Length"}, inplace=True)
    dataf.to_csv(sm.output.tsv, sep="\t")


def process_and_sum(q_df, annot_df):
    # Merge annotations and abundance
    # keep ORFs without annotation as "Unclassified"
    annot_q_df = pd.merge(annot_df, q_df, left_index=True, right_index=True,
                          how="right")
    annot_q_df.fillna("Unclassified", inplace=True)
    feature_cols = annot_df.columns
    annot_q_sum = annot_q_df.groupby(list(feature_cols)).sum().reset_index()
    annot_q_sum.set_index(feature_cols[0], inplace=True)
    return annot_q_sum


def sum_to_features(abundance, parsed):
    parsed_df = pd.read_csv(parsed, index_col=0, sep="\t")
    abundance_df = pd.read_csv(abundance, index_col=0, sep="\t")
    abundance_df.drop("Length", axis=1, inplace=True, errors="ignore")
    feature_sum = process_and_sum(abundance_df, parsed_df)
    return feature_sum


def count_features(sm):
    """
    Counts reads mapped to features such as KOs, PFAMs etc.

    :param sm:
    :return:
    """
    feature_sum = sum_to_features(sm.input.abund, sm.input.annot[0])
    feature_sum.to_csv(sm.output[0], sep="\t")


def get_sample_counts(samples, assemblies, db, c):
    """
    Sub-function for iterating each sample and looking up the corresponding
    count from its assembly

    :param samples:
    :param db:
    :return:
    """
    d = {}
    annots_dict = {}
    index_col = 0
    if db=="modules":
        index_col=4
    for sample in samples.keys():
        assembly = samples[sample]["assembly"]
        if assembly not in assemblies:
            continue
        f = f"results/{assembly}/{db}.parsed.{c}.tsv"
        df = pd.read_csv(f, sep="\t", index_col=index_col)
        # Extract annotation columns
        annots = df.loc[:, df.dtypes == object]
        annots_dict.update(annots.to_dict(orient="index"))
        d[sample] = df.loc[:, sample]
    counts_df = pd.DataFrame(d)
    counts_df.fillna(0, inplace=True)
    counts_df = pd.merge(pd.DataFrame(annots_dict).T, counts_df, right_index=True, left_index=True)
    return counts_df


def extract_counts(sm):
    """
    This function extracts counts of features for each sample from its
    corresponding single-sample assembly.

    :param sm:
    :return:
    """
    samples = parse_samples(sm.params.sample_list)
    assemblies = parse_assemblies(sm.params.assembly_list, sm.params.assembly_dir)
    db = sm.wildcards.db
    c = sm.wildcards.c
    counts_df = get_sample_counts(samples, assemblies, db, c)
    counts_df.index.name = db
    counts_df.to_csv(sm.output[0], sep="\t")


def marker_gene_norm(sm):
    df = pd.read_csv(sm.input[0], sep="\t", index_col=0)
    info_df = df.loc[:, df.dtypes==object]
    norm_models = sm.params.norm_models
    norm_df = df.loc[df.index.intersection(norm_models)]
    df_sum = df.groupby(level=0).sum()
    norm_median = norm_df.groupby(level=0).sum().median()
    df_norm = df_sum.div(norm_median)
    df_norm = pd.merge(info_df, df_norm, left_index=True, right_index=True)
    df_norm.to_csv(sm.output[0], sep="\t")


def main(sm):
    toolbox = {"clean_featurecount": clean_featurecount,
               "count_features": count_features,
               "extract_counts": extract_counts,
               "marker_gene_norm": marker_gene_norm}
    toolbox[sm.rule](sm)


if __name__ == "__main__":
    main(snakemake)
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
log <- file(snakemake@log[[1]], open="wt")
sink(log)
sink(log, type="message")
source("workflow/scripts/common.R")

library(edgeR)
method <- snakemake@params$method
input <- snakemake@input[[1]]
output <- snakemake@output[[1]]
if (is.null(snakemake@wildcards$db)) {
    db = ""
} else {
    db <- snakemake@wildcards$db
}
print(db)

# Read the counts
if (db == "modules") {
    x <- read.delim(input, sep = "\t", header = TRUE, check.names = FALSE)
    if ( db%in%colnames(x) ) {
        rownames(x) <- x$modules
        x <- subset(x, select=-c(modules))
    } else {
        rownames(x) <- x$module
        x <- subset(x, select=-c(module))
    }
} else {
    x <- read.delim(input, row.names = 1, sep = "\t", header = TRUE, check.names = FALSE)
}
# Get sample names
sample_names <- colnames(x)[unlist(lapply(x, is.numeric))]
# Extract row names
xrownames <- row.names(x)[row.names(x)!="Unclassified"]
# Get info names
info_names <- colnames(x)[unlist(lapply(x, is.character))]
# Remove unclassified
if ("Unclassified" %in% row.names(x)){
    x <- x[row.names(x)!="Unclassified", ]
}

x <- as.data.frame(x, row.names=xrownames)
colnames(x) <- append(info_names, sample_names)

x_num <- process_data(x, output)

if (method %in% c("TMM", "RLE")) {
    # Create DGE
    obj <- DGEList(x_num)
    # Calculate norm factors
    obj <- calcNormFactors(obj, method = method)
    # Calculate cpms
    norm <- cpm(obj, normalized.lib.sizes = TRUE)
} else if (method == "RPKM") {
    # Extract gene length column
    gene_length <- x_num$Length
    x_num <- x_num[, colnames(x_num) != "Length"]
    obj <- DGEList(x_num)
    # Calculate norm factors
    obj <- calcNormFactors(obj, method = "TMM")
    # Calculate RPKM
    norm <- rpkm(obj, normalized.lib.sizes = TRUE, gene.length = gene_length)
}

# Add info columns back
norm <- cbind(str_cols(x), norm)
if (length(info_names)>0) {
    colnames(norm) <- append(info_names, sample_names)
}
# Convert to numeric
norm <- as.data.frame(norm)
row.names(norm) <- xrownames

write.table(x = norm, file = output, quote = FALSE, sep="\t")
R From line 3 of scripts/edger.R
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
log <- file(snakemake@log[[1]], open="wt")
sink(log)
sink(log, type="message")

library(metagenomeSeq)
source("workflow/scripts/common.R")

method <- snakemake@params$method
input <- snakemake@input[[1]]
output <- snakemake@output[[1]]
normalize <- TRUE
db <- snakemake@wildcards$db

# Read the counts
if (db == "modules") {
    x <- read.delim(input, sep = "\t", header = TRUE, check.names = FALSE)
    if ( db%in%colnames(x) ) {
        rownames(x) <- x$modules
        x <- subset(x, select=-c(modules))
    } else {
        rownames(x) <- x$module
        x <- subset(x, select=-c(module))
    }
} else {
    x <- read.delim(input, row.names = 1, sep = "\t", header = TRUE, check.names = FALSE)
}

# Get sample names
sample_names <- colnames(x)[unlist(lapply(x, is.numeric))]
# Extract row names
xrownames <- row.names(x)[row.names(x)!="Unclassified"]
# Get info names
info_names <- colnames(x)[unlist(lapply(x, is.character))]

# Remove unclassified
if ("Unclassified" %in% row.names(x)){
    x <- x[row.names(x)!="Unclassified", ]
}

x <- as.data.frame(x, row.names=xrownames)
colnames(x) <- append(info_names, sample_names)

# Returns a vector in the case of 1 sample only
x_num <- process_data(x, output)

# If only one sample, set normalize=FALSE
if (length(sample_names) == 1) {
    print("ONLY ONE SAMPLE! WILL NOT RUN CSS")
    normalize <- FALSE
    x_num <- as.data.frame(x_num, row.names = rownames(x))
    colnames(x_num) <- sample_names
}

# Turn data into new experimentobject
obj <- newMRexperiment(x_num)
too_few_features <- FALSE
# In cases with only one sample, check whether there are enough features to run
# CSS
if (is.null(ncol(x_num))) {
    if (length(x_num) <= 1) {
        too_few_features <- TRUE
    }
} else { # Do the same type of check for many samples
    smat <- lapply(1:ncol(x_num), function(i) {
        sort(x_num[which(x_num[, i]>0),i], decreasing = TRUE)
    })
    if (any(sapply(smat,length)==1)) {
        too_few_features <- TRUE
    }
}
if (too_few_features == TRUE) {
    fh <-file(snakemake@output[[1]])
    writeLines(c("WARNING: Sample with one or zero features",
                 "Cumulative Sum Scaling failed for sample"), fh)
    close(fh)
    quit()
}

# Normalize
norm <- MRcounts(obj, norm = normalize)

# Add info columns back
norm <- cbind(str_cols(x), norm)
colnames(norm) <- append(info_names, sample_names)

# Convert to numeric
norm <- as.data.frame(norm)

# Set sample names
colnames(norm)[unlist(lapply(norm, is.numeric))] <- sample_names

# Write output
write.table(x = norm, file = output, quote = FALSE, sep="\t")
54
55
56
57
shell:
    """
    gzip -c {input} > {output}
    """
72
73
74
75
76
77
78
79
shell:
    """
    bowtie2-build \
        --large-index \
        --threads {threads} \
        {params.prefix} \
        {params.prefix} > /dev/null 2>&1
    """
101
102
103
104
105
106
107
108
shell:
    """
    bowtie2 {params.setting} -1 {input.fq1} -2 {input.fq2} -p {threads} -x {params.index} 2> {log} | \
        samtools view -bh - | samtools sort - -o {params.temp_bam}
    samtools index {params.temp_bam}
    mv {params.temp_bam} {output.bam}
    mv {params.temp_bam}.bai {output.bai}
    """
129
130
131
132
133
134
shell:
    """
    mkdir -p {params.tmpdir}
    featureCounts {params.setting} -a {input.gff} -o {output.tsv} \
        -T {threads} --tmpDir {params.tmpdir} {input.bam} > {log} 2>&1
    """
143
script: "scripts/common.py"
158
159
script:
    "scripts/common.py"
175
176
177
178
179
shell:
    """
    echo {input} | tr ' ' '\n' > filelist
    multiqc --cl-config 'extra_fn_clean_exts: [".bt2.log"]' -n report.html -o {params.outdir} -f -l filelist > {log} 2>&1
    """
195
196
script:
    "scripts/edger.R"
209
210
script:
    "scripts/common.py"
222
223
script:
    "scripts/common.py"
239
240
script:
    "scripts/edger.R"
254
255
script:
    "scripts/metagenomeseq.R"
ShowHide 8 more snippets with no or duplicated tags.

Login to post a comment if you would like to share your experience with this workflow.

Do you know this workflow well? If so, you can request seller status , and start supporting this workflow.

Free

Created: 1yr ago
Updated: 1yr ago
Maitainers: public
URL: https://github.com/johnne/map_metaT
Name: map_metat
Version: v0.1.1
Badge:
workflow icon

Insert copied code into your website to add a link to this workflow.

Downloaded: 0
Copyright: Public Domain
License: MIT License
  • Future updates

Related Workflows

cellranger-snakemake-gke
snakemake workflow to run cellranger on a given bucket using gke.
A Snakemake workflow for running cellranger on a given bucket using Google Kubernetes Engine. The usage of this workflow ...