INSaFLU Snakemake

public public 1yr ago Version: v1.0.0 0 bookmarks

Create Environment

Install Mamba Forge:

curl -L https://github.com/con

Code Snippets

17
18
19
shell:
    "abricate {params} {input} > {output.csv}"
    " && python3 {scripts_directory}get_abricate_info_list.py {output.csv} {output.yaml}"
38
39
40
shell:
    "abricate {params} {input} > {output.csv}"
    " && python3 {scripts_directory}get_abricate_info_list.py {output.csv} {output.yaml}"
59
60
61
shell:
    "abricate {params} {input} > {output.csv}"
    " && python3 {scripts_directory}get_abricate_info_list.py {output.csv} {output.yaml}"
19
20
shell:
    "fastqc {input} -o {output.dir} {params} && python3 {scripts_directory}move_fastqc_output.py {wildcards.sample}"
41
42
shell:
    "fastqc {input} -o {output.dir} {params} && python3 {scripts_directory}move_fastqc_output.py {wildcards.sample}"
64
65
shell:
    "fastqc {input.read_1} {input.read_2} -o {output.dir} {params} -t {threads}"
85
86
shell:
    "fastqc {input.read} -o {output.dir} {params} -t {threads}"
18
19
shell:
    "fasttree {params} {input} > {output.tree} && cp {output.tree} {output.nwk}"
38
39
shell:
    "fasttree {params} {input} > {output.tree} && cp {output.tree} {output.nwk}"
57
58
shell:
    "fasttree {params} {input} > {output}"
76
77
78
shell:
    "cp {input.tree} {output.tree} &&"
    "cp {input.tree} {output.nwk}"
33
34
shell:
    "freebayes {params} -f {input.ref} {input.samples} > {output}"
25
26
27
shell:
  "python {coverage_script} -i {input.depth} -r {REFERENCE_FASTA}"
   " -o {output.coverage} -t {params.threshold} -g {REFERENCE_GB}"
54
55
shell:
    "python {scripts_directory}merge_coverage.py '{input}' {output.coverage_regular} {output.coverage_translate} {REFERENCE_GB}"
18
19
20
21
22
23
24
shell:
    "mkdir align_samples/{wildcards.sample}/reference -p && "
    "cp {REFERENCE_FASTA} align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta && "
    "bwa index align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta && "
    "bwa mem align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta {input.reads_1} {input.reads_2} | "
    "samtools view -u -T align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta -q {params.minqual} | "
    "samtools sort --reference align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta > {output}"
42
43
44
45
46
47
48
shell:
    "mkdir align_samples/{wildcards.sample}/reference -p && "
    "cp {REFERENCE_FASTA} align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta && "
    "bwa index align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta && "
    "bwa mem align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta {input.reads_1} | "
    "samtools view -u -T align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta -q {params.minqual} | "
    "samtools sort --reference align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta > {output}"
69
70
71
72
73
74
shell:
    "bwa mem -k 5 -T 16 align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta {PRIMER_FASTA} | samtools view -b -F 4 > {params.folder}primers.bam"
    " && bedtools bamtobed -i {params.folder}primers.bam > {params.folder}primers.bed &&"
    "samtools sort -o {input.pre_snps}.sorted {input.pre_snps} "
    "&& ivar trim -b {params.folder}primers.bed -i {input.pre_snps}.sorted -m 0 -q 0 -p {output} -e "
    "&& samtools sort -o {output} {output}"
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
shell:
    " ivar trim -m 0 -q 0 -e -b {params.folder}primers.bed -p {params.prefix}.trimmed -i {input.bam}"
    " && samtools sort -o {params.prefix}.trimmed.sorted.bam {params.prefix}.trimmed.bam"
    " && samtools index {params.prefix}.trimmed.sorted.bam"
    " && samtools mpileup -A -d 0 -Q 0 {input.bam} | ivar consensus -m 0 -n N -p {params.prefix}.ivar_consensus"
    " && ./../workflow/scripts/run_check_consensus {params.prefix}.ivar_consensus.fa align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta"
    " && bwa index -p {params.prefix}.ivar_consensus {params.prefix}.ivar_consensus.fa"
    " && bwa mem -k 5 -T 16 {params.prefix}.ivar_consensus {PRIMER_FASTA} | samtools view -bS -F 4 | samtools sort -o {params.folder}primers_consensus.bam"
    " && samtools mpileup -A -d 0 --reference {params.prefix}.ivar_consensus.fa -Q 0 {params.folder}primers_consensus.bam | ivar variants -p {params.folder}primers_consensus -t 0.03"
    " && bedtools bamtobed -i {params.folder}primers_consensus.bam > {params.folder}primers_consensus.bed"
    " && ivar getmasked -i {params.folder}primers_consensus.tsv -b {params.folder}primers_consensus.bed -f {PRIMER_FASTA}.pair_information.tsv -p {params.folder}primer_mismatchers_indices"
    " && ivar removereads -i {params.prefix}.trimmed.sorted.bam -p {params.prefix}.masked.bam -t {params.folder}primer_mismatchers_indices.txt -b {params.folder}primers.bed"
    " && samtools sort -o {params.prefix}.masked.sorted.bam {params.prefix}.masked.bam"
    " && cp {params.prefix}.masked.sorted.bam {output}"
    " && samtools index {output}"
127
128
129
shell:
    "samtools mpileup -A -d 600000 -B -Q 0 {input.bam} --reference align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta | "
    "ivar variants -p align_samples/{wildcards.sample}/iVar/snps -q {params.mapqual} -t {params.minfrac} -m {params.mincov}  align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta "
150
151
152
shell:
    "samtools mpileup -d 0 -A -Q {params.mapqual} {input.bam} --reference align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta | "
    "ivar consensus -p align_samples/{wildcards.sample}/iVar/{params.consensus} -q {params.mapqual} -t {params.minfrac} -n N -m {params.mincov} "
175
176
177
shell:
    "samtools depth {params} {input.i} | bgzip -c > {output.only_depth} "
    "&& tabix -p vcf {output.only_depth} && gunzip -c {output.only_depth}  > {output.depth} "
194
195
shell:
    "python ../workflow/scripts/split_consensus.py {input.consensus} {input.depth} {REFERENCE_GB} {output.consensus}"
SnakeMake From line 194 of rules/iVar.smk
211
212
shell:
    "python ../workflow/scripts/split_depth_file.py {input.zipped} {REFERENCE_GB}"
SnakeMake From line 211 of rules/iVar.smk
228
229
shell:
    "python ../workflow/scripts/mask_consensus_by_deep.py align_samples/{wildcards.sample}/reference/{REFERENCE_NAME}.fasta {input.first_consensus} {output.align_file} {wildcards.seg}"
SnakeMake From line 228 of rules/iVar.smk
248
249
shell:
    "mafft --thread {threads} {params} {input.align_file} > {output.aligned_file}"
268
269
shell:
    "python ../workflow/scripts/msa_masker.py -i {input.align_file} -df {input.depth} -o {output} {params}"
SnakeMake From line 268 of rules/iVar.smk
289
290
shell:
    "python ../workflow/scripts/get_consensus_medaka.py '{input}' {output}"
SnakeMake From line 289 of rules/iVar.smk
310
311
shell:
    "python {scripts_directory}mask_regions.py {input.consensus} {output.final_consensus} {params} "
SnakeMake From line 310 of rules/iVar.smk
323
324
shell:
    "python {scripts_directory}convert_vcf.py {input.tsv_file} {output.vcf_file} "
SnakeMake From line 323 of rules/iVar.smk
21
22
shell:
    "mafft --thread {threads} {params} {input.align_file} > {output.aligned_file}"
41
42
shell:
    "mafft --thread {threads} {params} {input.align_file} > {output.aligned_file}"
61
62
shell:
    "mafft --thread {threads} {params} {input} > {output}"
81
82
shell:
    "mafft --thread {threads} {params} {input} > {output}"
101
102
shell:
    "mafft --thread {threads} {params} {input} > {output}"
121
122
shell:
    "mafft --thread {threads} {params} {input} > {output}"
138
139
shell:
    "cp {input} {output}"
SnakeMake From line 138 of rules/mafft.smk
13
14
shell:
    "mkdir {output} && cp {software_parameters_path} projects/{wildcards.project}/"
34
35
36
shell:
    "mkdir projects/{wildcards.project}/sample_{wildcards.sample}/ -p && "
    " cp -r {params} projects/{wildcards.project}/sample_{wildcards.sample}/ "
60
61
62
shell:
    "python {scripts_directory}generate_AllConsensus.py {input.coverage} {REFERENCE_GB} '{input.every_consensus}' {REFERENCE_FASTA} {output.AllConsensus} {output.all_consensus_no_ref} "
    "&& python {scripts_directory}concat_segments.py '{input.every_consensus}' {REFERENCE_GB} {output.All_nt} {input.coverage} {REFERENCE_FASTA} {output.All_nt_only_90plus}"
89
90
shell:
    "python {scripts_directory}split_files_by_locus.py {input} projects/{PROJECT_NAME}/main_result {REFERENCE_GB}"
21
22
23
shell:
    "rm -r align_samples/{wildcards.sample}/medaka/ && mkdir align_samples/{wildcards.sample}/medaka/   && cp {input.ref} {output.ref_out} && medaka_consensus -i {input.i} -d {output.ref_out} -o align_samples/{wildcards.sample}/medaka -t {threads} -m r941_min_high_g360"
    " && cp {output.i} {output.i2}"
43
44
45
shell:
    "samtools depth {params} {input.i} | bgzip -c > {output.only_depth} "
    "&& tabix -p vcf {output.only_depth} && gunzip -c {output.only_depth}  > {output.depth} "
61
62
shell:
    "python3 {scripts_directory}split_depth_file.py {input} {REFERENCE_GB}"
80
81
shell:
    "medaka variant --verbose {input.ref} {input.hdf} {output.vcf}"
100
101
102
shell:
    "medaka tools annotate {input.vcf} {input.ref} {input.bam} {output.snps} && "
    "bgzip {output.snps} -c > {output.vcf_zipped} && tabix {output.vcf_zipped}"
126
127
128
129
130
shell:
    "touch {output.temp_file} && "
    "python {scripts_directory}add_freq_medaka_consensus.py {input.normal_reference_fasta} {input.vcf_file} {input.file_coverage} "
    "{output.vcf_file_out} {output.vcf_file_out_removed_by_filter} {output.temp_file} {output.normal_reference_fasta_removed} {output.temp_file} {params} &&"
    "bgzip -c -f {output.vcf_file_out} > {output.vcf_file_out_compr} && tabix {output.vcf_file_out_compr}"
147
148
shell:
    "bcftools consensus -s SAMPLE -f {input.ref} {input.vcf_ziped} -o {output}"
164
165
shell:
    "python {scripts_directory}mask_consensus_by_deep.py {REFERENCE_FASTA} {input.first_consensus} {output.align_file} {wildcards.seg}"
SnakeMake From line 164 of rules/medaka.smk
184
185
shell:
    "python {scripts_directory}msa_masker.py -i {input.align_file} -df {input.depth} -o {output} {params}"
SnakeMake From line 184 of rules/medaka.smk
205
206
shell:
    "python {scripts_directory}get_consensus_medaka.py '{input}' {output}"
SnakeMake From line 205 of rules/medaka.smk
224
225
shell:
    "python {scripts_directory}mask_regions.py {input} {output} {params}"
SnakeMake From line 224 of rules/medaka.smk
16
17
shell:
    "gunzip -cd {input} | NanoFilt {params} | gzip > {output}"
17
18
shell:
    "mkdir {output.dir} -p && NanoStat --fastq {input} --outdir {output.dir} -n {wildcards.sample}_stats.txt -t {threads}"
36
37
shell:
    "NanoStat --fastq {input} --outdir {output.dir} -n {wildcards.sample}_stats.txt -t {threads}"
17
18
19
shell:
    "abricate {params} {REFERENCE_FASTA} > {output.csv}"
    " && python3 {scripts_directory}get_abricate_info.py {output.csv} {output.yaml}"
35
36
shell:
    "echo 'This is not SARS-CoV-2' > {output}"
56
57
shell:
    "pangolin {input.consensus} --outfile {output} -t {threads}"
17
18
shell:
    "seqret {params} -sequence {input} -outseq {output}"
36
37
shell:
    "seqret {params} -sequence {input} -outseq {output}"
55
56
shell:
    "seqret {params} -sequence {input} -outseq {output}"
74
75
shell:
    "seqret {params} -sequence {input} -outseq {output}"
93
94
shell:
    "seqret {params} -sequence {input} -outseq {output}"
112
113
shell:
    "seqret {params} -sequence {input} -outseq {output}"
131
132
shell:
    "seqret {params} -sequence {input} -outseq {output}"
150
151
shell:
    "seqret {params} -sequence {input} -outseq {output}"
26
27
28
shell:
    "rm -r align_samples/{wildcards.sample}/snippy/ && "
    "snippy --cpus {threads} --pe1 {input.reads_1} --pe2 {input.reads_2} --ref {REFERENCE_FASTA} --outdir align_samples/{wildcards.sample}/snippy/ {params}"
50
51
52
shell:
    "rm -r align_samples/{wildcards.sample}/snippy/ && "
    "snippy --cpus {threads} --se {input.read} --ref {REFERENCE_FASTA} --outdir align_samples/{wildcards.sample}/snippy/ {params}"
71
72
shell:
    "gunzip -c {input.zipped} > {output.unzipped}"
88
89
90
shell:
    "python3 {scripts_directory}split_depth_file.py align_samples/{wildcards.sample}/snippy/depth/snps.depth {REFERENCE_GB} "
    "&& touch {output.unzipped}"
106
107
shell:
    "python {scripts_directory}mask_consensus_by_deep.py {REFERENCE_FASTA} {input.first_consensus} {output.align_file} {wildcards.seg}"
SnakeMake From line 106 of rules/snippy.smk
126
127
shell:
    "python {scripts_directory}msa_masker.py -i {input.align_file} -df {input.depth} -o {output} {params}"
SnakeMake From line 126 of rules/snippy.smk
147
148
shell:
    "python {scripts_directory}get_consensus_medaka.py '{input}' {output}"
SnakeMake From line 147 of rules/snippy.smk
166
167
shell:
    "python {scripts_directory}mask_regions.py {input} {output} {params}"
SnakeMake From line 166 of rules/snippy.smk
26
27
shell:
    "python {scripts_directory}create_snpeff_text.py $CONDA_PREFIX {input.ref_gb} {input.ref_fa} {REFERENCE_NAME} {output} "
46
47
48
shell:
    "{replace} {input.snp_file} &&"
    "snpEff {params} -v {REFERENCE_NAME} {input.snp_file} > {output}"
67
68
69
shell:
    "{replace} {input.i1} && "
    "snpEff {params} -v {REFERENCE_NAME} {input.i1}  > {output}"
22
23
shell:
    "spades.py -t {threads} {params} -s {input} -o {output.dir}"
44
45
shell:
    "spades.py -t {threads}  {params} -1 {input.read_1} -2 {input.read_2} -o {output.dir}"
16
17
shell:
    "python {scripts_directory}translate.py {REFERENCE_GB} {input.Alignment_nt} {output} '{wildcards.locus}' '{wildcards.gene}' {input.coverage}"
23
24
25
26
27
28
29
30
31
shell:
    "trimmomatic PE "
    "{input} "
    "{output.read_1} "
    "{output.read_un1} "
    "{output.read_2} "
    "{output.read_un2} "
    "-threads {threads} "
    "{params}"
50
51
52
53
54
55
shell:
    "trimmomatic SE "
    "-threads {threads} "
    "{input} "
    "{output} "
    "{params}"
19
20
shell:
    "python {scripts_directory}variants.py '{input}' '{output}' validated_variants"
40
41
shell:
    "python {scripts_directory}variants.py '{input}' {output} minor_iSNVs"
61
62
shell:
    "python {scripts_directory}variants.py '{input}' {output} minor_iSNVs_inc_indels"
82
83
shell:
    "python {scripts_directory}proportions_iSNVs_graph.py '{input}' {output.out_file}"
13
14
shell:
    "echo 'No sample with all segments above 90% coverage for the value given in the user/parameters.yaml file.' > {output}"
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
import os
import sys
from Bio import SeqIO
from Bio.Seq import MutableSeq
import pysam


def get_type_variation(ref, alt):
    """return type of variation based on change
    possible in variation type
                    snp	Single Nucleotide Polymorphism	A => T
                    mnp	Multiple Nuclotide Polymorphism	GC => AT
                    ins	Insertion	ATT => AGTT
                    del	Deletion	ACGG => ACG
                    complex	Combination of snp/mnp
    """
    if ref == alt == 1:
        return "snp"
    if ref > alt:
        return "del"
    if ref < alt:
        return "ins"
    return "mnp" if ref == alt and alt > 1 else "complex"


def variant_has_SR_DPSP_AR(variant):
    return "SR" in variant and "DPSP" in variant and "AR" in variant


def read_text_file(file_name):
    """
    read text file and put the result in an vector
    """

    vect_out = []
    with open(file_name, "r", encoding="utf-8") as handle:
        for line in handle:
            sz_temp = line.strip()
            if len(sz_temp) == 0:
                continue
            vect_out.append(sz_temp)
    return vect_out


def get_coverage_by_pos(
    file_coverage, chr_name, position_start, position_end, temp_file
):
    if not os.path.exists(file_coverage):
        return -1
    cmd = f"tabix {file_coverage} {chr_name}:{position_start}-{position_end} > {temp_file}"
    os.system(cmd)
    # get number
    vect_lines = read_text_file(temp_file)
    if (
        len(vect_lines) == 1
        and len(vect_lines[0].split("\t")) == 3
        and isinstance(int(vect_lines[0].split("\t")[2]), int)
    ):
        return int(vect_lines[0].split("\t")[2])
    return -1


def add_freq_ao_ad_and_type_to_vcf(
    vcf_file,
    vcf_file_out,
    vcf_file_out_removed_by_filter,
    freq_vcf_limit,
    coverage_limit,
    file_coverage,
    temp_file,
):
    """add FREQ, AO, AF and TYPE to VCF, FREQ=AO/DP
    This case is used in MEDAKA only
    :param vcf_file_out_removed_by_filter -> can be None, keep the variants that are filter by  freq_vcf_limit
    vcffile must be gzip and tbi included
    :param coverage_limit -> filter by this coverage (this is necessary because medaka doesn't have)
    :param cut off for VCF freq
    returns: vcf file with freq, AO and AF
    """
    FREQ = "FREQ"
    AO = "AO"
    RO = "RO"
    AF = "AF"
    TYPE = "TYPE"
    DP_COMPOSED = "DP_COMPOSED"  # this is used to get

    # read the input file
    vcf_hanlder = pysam.VariantFile(vcf_file, "r")
    if (
        FREQ in vcf_hanlder.header.info
        and AO in vcf_hanlder.header.info
        and AF in vcf_hanlder.header.info
    ):
        vcf_hanlder.close()
        return

    vcf_hanlder_write = pysam.VariantFile(vcf_file_out, "w")
    if vcf_file_out_removed_by_filter is not None:
        vcf_hanlder_write_removed_by_filter = pysam.VariantFile(
            vcf_file_out_removed_by_filter, "w"
        )
    if FREQ not in vcf_hanlder.header.info:
        vcf_hanlder.header.info.add(
            FREQ, number="A", type="Float", description="Ratio of AO/(DPSP-AR)"
        )
    if AO not in vcf_hanlder.header.info:
        vcf_hanlder.header.info.add(
            AO,
            number="A",
            type="Integer",
            description="Alternate allele observation count, SR (alt1 fwd + alt1 rev, etc.)",
        )
    if RO not in vcf_hanlder.header.info:
        vcf_hanlder.header.info.add(
            RO,
            number="1",
            type="Integer",
            description="Reference allele observation count, SR (ref fwd + ref rev)",
        )
    if AF not in vcf_hanlder.header.info:
        vcf_hanlder.header.info.add(
            AF,
            number="R",
            type="Integer",
            description="Number of observation for each allele, SR (ref fwd + ref rev, alt1 fwd + alt1 rev, etc.)",
        )
    if TYPE not in vcf_hanlder.header.info:
        vcf_hanlder.header.info.add(
            TYPE,
            number="A",
            type="String",
            description="The type of allele, either snp, mnp, ins, del, or complex",
        )
    if DP_COMPOSED not in vcf_hanlder.header.info:
        vcf_hanlder.header.info.add(
            DP_COMPOSED,
            number="1",
            type="String",
            description="Coverage at position (DPSP-AR)/(samtools -aa). First is collected by Medaka, Second is collected by samtools.",
        )

    # write the header
    for variant_header_records in vcf_hanlder.header.records:
        vcf_hanlder_write.header.add_record(variant_header_records)
        if vcf_file_out_removed_by_filter is not None:
            vcf_hanlder_write_removed_by_filter.header.add_record(
                variant_header_records
            )

    for variant_sample in vcf_hanlder.header.samples:
        vcf_hanlder_write.header.add_sample(variant_sample)
        if vcf_file_out_removed_by_filter is not None:
            vcf_hanlder_write_removed_by_filter.header.add_sample(variant_sample)

    for variant in vcf_hanlder:
        # DP must be replaced by DPSP. DPSP is the
        # sum of all reads Span and Ambiguous
        if variant_has_SR_DPSP_AR(variant.info):  # SR=0,0,15,6
            # don't process this VCF because has a low coverage
            total_deep = int(variant.info["DPSP"]) - sum(
                int(x) for x in variant.info["AR"]
            )
            total_deep_samtools = get_coverage_by_pos(
                file_coverage,
                variant.chrom,
                variant.pos,
                variant.pos,
                temp_file,
            )
            if coverage_limit > 0 and total_deep_samtools < coverage_limit:
                continue
            if ((len(variant.info["SR"]) // 2) - 1) != len(variant.alts):
                # vcf_hanlder_write.write(variant)
                continue  # different numbers of Alleles and References

            # extra info
            vect_out_ao = []  # AO
            out_ro = -1  # RO
            vect_out_af = []  # AF
            vect_out_freq = []  # FREQ
            vect_out_freq_filtered = []  # FREQ
            vect_out_type = []  # TYPE

            for value_ in range(0, len(variant.info["SR"]), 2):
                if value_ > 0:
                    allele_count = int(variant.info["SR"][value_]) + int(
                        variant.info["SR"][value_ + 1]
                    )

                    if total_deep > 0:
                        # incongruences in Medaka,
                        # these values are collected in different stages of the Medaka workflow, (email from [email protected] at 23 Dec 2020)
                        if total_deep <= allele_count:
                            vect_out_freq.append(100)
                        else:
                            freq_value = allele_count / float(total_deep)
                            if freq_value >= freq_vcf_limit:
                                vect_out_freq.append(
                                    float("{:.1f}".format(freq_value * 100))
                                )
                            elif vcf_file_out_removed_by_filter is not None:
                                vect_out_freq_filtered.append(
                                    float("{:.1f}".format(freq_value * 100))
                                )
                        # print(variant.pos, variant.ref, str(variant.alts), variant.info['DP'], vect_out_ao[-1], vect_out_freq[-1])

                    vect_out_ao.append(allele_count)
                    vect_out_type.append(
                        get_type_variation(
                            len(variant.ref),
                            len(variant.alts[(value_ - 2) >> 1]),
                        )
                    )
                vect_out_af.append(
                    int(variant.info["SR"][value_])
                    + int(variant.info["SR"][value_ + 1])
                )
                if out_ro == -1:
                    out_ro = int(variant.info["SR"][value_]) + int(
                        variant.info["SR"][value_ + 1]
                    )

            # has some variant to save
            if vect_out_freq:
                if out_ro > -1:
                    variant.info[RO] = (out_ro, )
                variant.info[AO] = tuple(vect_out_ao)
                variant.info[AF] = tuple(vect_out_af)
                variant.info[TYPE] = tuple(vect_out_type)
                variant.info[DP_COMPOSED] = (f"{total_deep}/{total_deep_samtools}", )
                variant.info[FREQ] = tuple(vect_out_freq)

                # Only save the ones with FREQ
                vcf_hanlder_write.write(variant)

            # save the filtered
            if vect_out_freq_filtered:
                if out_ro > -1:
                    variant.info[RO] = (out_ro, )
                variant.info[AO] = tuple(vect_out_ao)
                variant.info[AF] = tuple(vect_out_af)
                variant.info[TYPE] = tuple(vect_out_type)
                variant.info[DP_COMPOSED] = (f"{total_deep}/{total_deep_samtools}", )
                variant.info[FREQ] = tuple(vect_out_freq_filtered)

                # Only save the ones with FREQ
                vcf_hanlder_write_removed_by_filter.write(variant)

    vcf_hanlder_write.close()
    vcf_hanlder.close()
    if vcf_file_out_removed_by_filter is not None:
        vcf_hanlder_write_removed_by_filter.close()
    return vcf_file_out


def compute_masking_sites(
    sequence,
    ranges=None,
    single_positions=None,
    from_beggining=None,
    from_end=None,
) -> list[int]:
    length = len(sequence)
    masking_sites = []
    if ranges is not None:
        for pos in ranges:
            # print(ranges)
            pos[0] = int(pos[0])
            pos[1] = int(pos[1])
            if pos[1] < length and pos[1] > pos[0]:
                masking_sites.extend(iter(range(pos[0] - 1, pos[1] - 1)))
    if single_positions is not None:
        for pos in single_positions:
            pos = int(pos)
            if pos < length:
                masking_sites.append(pos - 1)
    if from_beggining is not None:
        masking_sites.extend(iter(range(int(from_beggining))))
    if from_end is not None:
        masking_sites.extend(iter(range(length - int(from_end), length)))
    return masking_sites


def main():
    freq_vcf_limit: float = float(sys.argv[10])
    coverage_limit: float = float(sys.argv[9])
    element_name_old = ""
    vect_sites = []
    vect_ranges = []
    final_mask_consensus = []

    vcf_file = sys.argv[2]
    vcf_file_out = sys.argv[4]
    vcf_file_out_removed_by_filter = sys.argv[5]
    file_coverage = sys.argv[3]
    temp_file = sys.argv[8]
    add_freq_ao_ad_and_type_to_vcf(
        vcf_file,
        vcf_file_out,
        vcf_file_out_removed_by_filter,
        freq_vcf_limit,
        coverage_limit,
        file_coverage,
        temp_file,
    )

    final_vcf_with_removed_variants = sys.argv[5]

    vcf_hanlder = pysam.VariantFile(final_vcf_with_removed_variants, "r")

    vect_sites = []
    vect_ranges = []
    for variant in vcf_hanlder:
        if element_name_old != variant.chrom:
            element_name_old = variant.chrom
        # MEDAKA output must have "TYPE" in info
        if variant.info["TYPE"][0] == "snp":
            vect_sites.append(str(variant.pos))
        elif variant.info["TYPE"][0] == "ins":
            vect_sites.append(str(variant.pos))
        elif variant.info["TYPE"][0] == "del":
            vect_ranges.append(
                f"{variant.pos + len(variant.alts[0])}-{variant.pos - len(variant.alts[0]) + len(variant.ref)}"
            )
        else:
            vect_ranges.append(f"{variant.pos}-{variant.pos + len(variant.ref) - 1}")

    sites = "".join(
        i if idx == len(vect_sites) - 1 else f"{i},"
        for idx, i in enumerate(vect_sites)
    )
    regions = "".join(
        i if idx == len(vect_ranges) - 1 else f"{i},"
        for idx, i in enumerate(vect_ranges)
    )
    # print(sites)
    # print(regions)

    consensus = sys.argv[1]
    output = sys.argv[7]
    final_mask_consensus = []

    ranges, single_positions, from_beggining, from_end = (
        [x.split("-") for x in regions.split(",")] if regions != "" else None,
        sites.split(",") if sites != "" else None,
        None,
        None,
    )

    ref_insertions = 0
    for record in SeqIO.parse(consensus, "fasta"):
        sequence = MutableSeq(record.seq)
        masking_sites = compute_masking_sites(
            sequence, ranges, single_positions, from_beggining, from_end
        )
        masking_sites = sorted(masking_sites)
        print(masking_sites)
        # Taken from insaflu
        ref_pos = 0
        gap = 0
        for idx in range(len(sequence)):
            if sequence[idx] == "-":
                # print("Hello")
                gap += 1
                # print(gap)

            # ref_insertions += 1
            # continue
                print(idx)
            if ref_pos in masking_sites:
                print(
                    f"placing N in position {ref_pos + ref_insertions}, it was a {sequence[ref_pos + ref_insertions]}"
                )
                sequence[ref_pos + ref_insertions] = "N"
            ref_pos += 1
            if (ref_pos + ref_insertions) >= len(record.seq):
                break
        # End of insaflu code
        record.seq = sequence
        final_mask_consensus.append(record)
        print(final_mask_consensus[0].seq[8650])

    SeqIO.write(final_mask_consensus, output, "fasta")


if __name__ == "__main__":
    main()
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from Bio import SeqIO


def is_fasta(filename: str) -> bool:
    """
    The is_fasta function checks if a file is in FASTA format.

    :param filename: str: Specify the file to be checked
    :return: True if the file is a fasta file, and false otherwise
    :doc-author: Trelent
    """
    with open(filename, "r", encoding="utf-8") as handle:
        fasta = SeqIO.parse(handle, "fasta")
        return any(fasta)


def is_genbank(filename: str) -> bool:
    """
    The is_gbk function checks if a file is in GenBank format.

    :param filename: str: Specify the file to be checked
    :return: True if the file is a genbank file, and false otherwise
    :doc-author: Trelent
    """
    with open(filename, "r", encoding="utf-8") as handle:
        gbk = SeqIO.parse(handle, "genbank")
        return any(gbk)


def same_identifiers(fasta_file: str, gbk_file: str) -> bool:
    """
    The same_identifiers function checks whether the IDs in a FASTA file match those
    in a GenBank file. It does this by opening both files and parsing them with
    the SeqIO module from Biopython. The function returns True if the lists of IDs
    are identical, and False otherwise.

    :param fasta_file: str: Specify the path to the fasta file
    :param gbk_file: str: Specify the path to the genbank file
    :return: True if the order of the ids in both files are equal
    :doc-author: Trelent
    """
    with open(fasta_file, "r", encoding="utf-8") as handle:
        fasta = SeqIO.parse(handle, "fasta")  # type: ignore
        fasta_ids: list[str] = [record.id for record in fasta]
    with open(gbk_file, "r", encoding="utf-8") as handle:
        gbk = SeqIO.parse(handle, "genbank")
        gbk_locus: list[str] = [record.name for record in gbk]
    return sorted(fasta_ids) == sorted(gbk_locus)
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import sys
import csv
import re
from Bio import SeqIO
from extract_gb_info import get_locus
from yaml_io import read_yaml


def get_fasta_reference_concat(fasta_ref):
    """
    The get_fasta_reference_concat function takes a fasta file as inputand returns the concatenated
    reference sequence.
    The function is used to create a single, concatenated reference sequence from multiple contigs
    in the assembly.

    :param fasta_ref: Specify the reference genome to be used for mapping
    :return: The concatenated reference sequence in the form of a seqrecord
    """
    concat_reference = []
    count = 0
    for record in SeqIO.parse(fasta_ref, "fasta"):
        if count == 0:
            concat_reference.append(record)
            start = fasta_ref.index("reference")
            end = fasta_ref.index(".fasta")
            # why + 11 => 16/05
            concat_reference[0].id = fasta_ref[start + 11 : end]
        else:
            concat_reference[0].seq += record.seq
        count += 1
    return concat_reference[0]


def get_id_sequence_from_consensus(consensus, sample_name, value_list):
    """
    The get_id_sequence_from_consensus function takes a fasta file of consensus sequences and
    returns the sequence of the locus that is specified by the value_list. The first record in this
    fasta file will be returned with its id changed to sample_name.
    This function assumes that there are no duplicate records in consensus, which may not be true.

    :param consensus: Define the consensus sequence file
    :param sample_name: Name the sequence in the seqrecord object
    :param value_list: Specify which segments to concatenate
    :return: A seqrecord object that contains the consensus sequence for a given sample
    :doc-author: Trelent
    """

    concat_consensus = []
    count = 1

    starting_segment = value_list[0]
    for record in SeqIO.parse(consensus, "fasta"):
        if count == starting_segment:
            concat_consensus.append(record)
            concat_consensus[0].id = sample_name
        elif count in value_list:
            concat_consensus[0].seq += record.seq
        count += 1
    return concat_consensus[0]


def get_id_sequence_from_consensus_strict(consensus, sample_name, value_list):
    """
    The get_id_sequence_from_consensus_strict function takes a consensus sequence file, the sample name, and a list of integers.
    It then parses through the consensus sequence file to find all sequences that match the starting segment in the list.
    The function concatenates these sequences together into one record and assigns it an ID based on its sample name.  It returns this record.

    :param consensus: Specify the path to a fasta file containing the consensus sequence for each sample
    :param sample_name: Name the sequence
    :param value_list: Specify the segments that will be concatenated
    :return: A seqrecord object from a fasta file
    :doc-author: Trelent
    """
    concat_consensus = []
    count = 1

    starting_segment = value_list[0]
    for record in SeqIO.parse(consensus, "fasta"):
        if count == starting_segment:
            concat_consensus.append(record)
            concat_consensus[0].id = ""
            concat_consensus[0].id = sample_name
            concat_consensus[0].description = ""
        elif count in value_list:
            concat_consensus[0].seq += record.seq
        count += 1
    return concat_consensus[0]


def create_consensus_file_for_alignment(
    consensus_list: list,
    reference_gb: str,
    output: str,
    coverage_file: str,
    reference_fasta: str,
    output_only_90_plus: str,
):
    """
    The create_consensus_file_for_alignment function takes a list of files, the reference genome in genbank format,
    the species name and an output file name as input. It then creates a fasta file with all the sequences from the
    consensus files that have coverage above 90%. The function also takes into account if it is flu or not. If it is flu
    it will only include samples that have 8 loci covered at 90% or more in one file and in the other it will concatonate all
    segments that have 90% coverage or more.

    :param consensus: Specify the path to the directory containing all of your consensus sequences
    :param reference_gb: Get the name of the reference sequence in a genbank file
    :param species: Determine which reference file to use
    :param output: Name the output file
    :param coverage_file: Specify the file containing all the coverage values for each sample
    :param fasta_file: Get the reference sequence from the fasta file
    :param output_only_90_plus: Create a fasta file with only the sequences that have at least 90% coverage
    :return: A fasta file with the consensus sequences of all samples that have a coverage over 90%
    :doc-author: Trelent
    """
    with open(coverage_file, encoding="UTF8", newline="") as csvfile:
        csv_reader = csv.reader(csvfile, delimiter=",")
        coverage_list = list(csv_reader)
    dic_directory = read_yaml("../config/constants.yaml")

    software_parameters = read_yaml(dic_directory["software_parameters"])
    coverage_value = software_parameters["min_coverage_consensus"]
    for row in coverage_list:
        for value in range(len(row) - 1, 0, -1):
            # print(row[value])
            if float(row[value]) > coverage_value:
                row[value] = value
            else:
                row.pop(value)

    coverage_dic = {}
    for i in coverage_list:
        coverage_dic[i[0]] = i[1:]

    fasta_reference_concat = get_fasta_reference_concat(reference_fasta)
    new_concat_file = [fasta_reference_concat]
    new_concat_file_strict = [fasta_reference_concat]
    if len(get_locus(reference_gb)) > 1:
        for file in consensus_list:
            sample_name = re.findall("(?<=sample_)(.*?)(?=/)", file)[0]
            if len(coverage_dic[sample_name]) > 0:
                new_sequence = get_id_sequence_from_consensus(
                    file, sample_name, coverage_dic[sample_name]
                )
                new_concat_file.append(new_sequence)
            if len(coverage_dic[sample_name]) == 8:
                new_sequence = get_id_sequence_from_consensus_strict(
                    file, sample_name, coverage_dic[sample_name]
                )
                new_concat_file_strict.append(new_sequence)
        SeqIO.write(new_concat_file, output_only_90_plus, "fasta")
        SeqIO.write(new_concat_file_strict, output, "fasta")

    else:
        for file in consensus_list:
            sample_name = re.findall("(?<=sample_)(.*?)(?=/)", file)[0]
            if len(coverage_dic[sample_name]) == 1:
                new_sequence = get_id_sequence_from_consensus_strict(
                    file, sample_name, coverage_dic[sample_name]
                )
                new_concat_file_strict.append(new_sequence)
        SeqIO.write(new_concat_file_strict, output_only_90_plus, "fasta")
        SeqIO.write(new_concat_file_strict, output, "fasta")


if __name__ == "__main__":
    CONSENSUS_LIST = sys.argv[1].split(" ")
    REFERENCE_GB = sys.argv[2]
    OUTPUT_FILE = sys.argv[3]
    COVERAGE_FILE = sys.argv[4]
    REFERENCE_FASTA = sys.argv[5]
    OUTPUT_FILE_WITH_90_PLUS_COVERAGE = sys.argv[6]

    create_consensus_file_for_alignment(
        CONSENSUS_LIST,
        REFERENCE_GB,
        OUTPUT_FILE,
        COVERAGE_FILE,
        REFERENCE_FASTA,
        OUTPUT_FILE_WITH_90_PLUS_COVERAGE,
    )
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import os
import sys
import re
import errno
import argparse


def parse_args(args=None):
    Description = "Convert iVar variants tsv file to vcf format."
    Epilog = """Example usage: python ivar_variants_to_vcf.py <FILE_IN> <FILE_OUT>"""

    parser = argparse.ArgumentParser(description=Description, epilog=Epilog)
    parser.add_argument("FILE_IN", help="Input tsv file.")
    parser.add_argument("FILE_OUT", help="Full path to output vcf file.")
    parser.add_argument(
        "-po",
        "--pass_only",
        dest="PASS_ONLY",
        help="Only output variants that PASS all filters.",
        action="store_true",
    )
    parser.add_argument(
        "-af",
        "--allele_freq_thresh",
        type=float,
        dest="ALLELE_FREQ_THRESH",
        default=0,
        help="Only output variants where allele frequency greater than this number (default: 0).",
    )

    return parser.parse_args(args)


def make_dir(path):
    if not len(path) == 0:
        try:
            os.makedirs(path)
        except OSError as exception:
            if exception.errno != errno.EEXIST:
                raise


def ivar_variants_to_vcf(FileIn, FileOut, passOnly=False, minAF=0):
    filename = os.path.splitext(FileIn)[0]
    header = (
        "##fileformat=VCFv4.2\n"
        "##source=iVar\n"
        '##INFO=<ID=DP,Number=1,Type=Integer,Description="Total Depth">\n'
        '##FILTER=<ID=PASS,Description="Result of p-value <= 0.05">\n'
        '##FILTER=<ID=FAIL,Description="Result of p-value > 0.05">\n'
        '##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype">\n'
        '##FORMAT=<ID=REF_DP,Number=1,Type=Integer,Description="Depth of reference base">\n'
        '##FORMAT=<ID=REF_RV,Number=1,Type=Integer,Description="Depth of reference base on reverse reads">\n'
        '##FORMAT=<ID=REF_QUAL,Number=1,Type=Integer,Description="Mean quality of reference base">\n'
        '##FORMAT=<ID=ALT_DP,Number=1,Type=Integer,Description="Depth of alternate base">\n'
        '##FORMAT=<ID=ALT_RV,Number=1,Type=Integer,Description="Deapth of alternate base on reverse reads">\n'
        '##FORMAT=<ID=ALT_QUAL,Number=1,Type=String,Description="Mean quality of alternate base">\n'
        '##FORMAT=<ID=ALT_FREQ,Number=1,Type=String,Description="Frequency of alternate base">\n'
    )
    header += (
        "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\t" + filename + "\n"
    )

    varList = []
    varCountDict = {"SNP": 0, "INS": 0, "DEL": 0}
    OutDir = os.path.dirname(FileOut)
    make_dir(OutDir)
    fout = open(FileOut, "w")
    fout.write(header)
    with open(FileIn) as f:
        for line in f:
            if not re.match("REGION", line):
                line = re.split("\t", line)
                CHROM = line[0]
                POS = line[1]
                ID = "."
                REF = line[2]
                ALT = line[3]
                var_type = "SNP"
                if ALT[0] == "+":
                    ALT = REF + ALT[1:]
                    var_type = "INS"
                elif ALT[0] == "-":
                    REF += ALT[1:]
                    ALT = line[2]
                    var_type = "DEL"
                QUAL = line[9]
                pass_test = line[13]
                if pass_test == "TRUE":
                    FILTER = "PASS"
                else:
                    FILTER = "FAIL"
                INFO = "DP=" + line[11]
                FORMAT = "GT:REF_DP:REF_RV:REF_QUAL:ALT_DP:ALT_RV:ALT_QUAL:ALT_FREQ"
                SAMPLE = (
                    "1:"
                    + line[4]
                    + ":"
                    + line[5]
                    + ":"
                    + line[6]
                    + ":"
                    + line[7]
                    + ":"
                    + line[8]
                    + ":"
                    + line[9]
                    + ":"
                    + line[10]
                )
                oline = (
                    CHROM
                    + "\t"
                    + POS
                    + "\t"
                    + ID
                    + "\t"
                    + REF
                    + "\t"
                    + ALT
                    + "\t"
                    + QUAL
                    + "\t"
                    + FILTER
                    + "\t"
                    + INFO
                    + "\t"
                    + FORMAT
                    + "\t"
                    + SAMPLE
                    + "\n"
                )
                writeLine = True
                if passOnly and FILTER != "PASS":
                    writeLine = False
                if float(line[10]) < minAF:
                    writeLine = False
                if (CHROM, POS, REF, ALT) in varList:
                    writeLine = False
                else:
                    varList.append((CHROM, POS, REF, ALT))
                if writeLine:
                    varCountDict[var_type] += 1
                    fout.write(oline)
    fout.close()

    ## Print variant counts to pass to MultiQC
    # varCountList = [(k, str(v)) for k, v in sorted(varCountDict.items())]
    # print("\t".join(["sample"] + [x[0] for x in varCountList]))
    # print("\t".join([filename] + [x[1] for x in varCountList]))


def main(args=None):
    args = parse_args(args)
    ivar_variants_to_vcf(
        args.FILE_IN, args.FILE_OUT, args.PASS_ONLY, args.ALLELE_FREQ_THRESH
    )


if __name__ == "__main__":
    sys.exit(main())
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import subprocess
import sys
from extract_gb_info import get_id_version, get_locus

CONFIG_FILE = "../workflow/db/snpeff.config"
DATA_DIRECTORY = "../workflow/db/data/"


def prepare_snpeff_run(ref_path_gb, locus, ref_path_fa, reference_name, output):
    """
    The prepare_snpeff_run function creates a snpEFF database for the reference genome.
    It takes as input:
        - The path to the reference genome in GenBank format, and
        - The name of the output file that will be created by this function.

       This function creates a directory called 'config/data' if it does not already exist,
       and then creates three files inside that directory: genes.gbk, sequences.fa,
       and snpeff_genes_prod_snpsiftdb-passed-only-geneidmap
       (the last one is created by running SnpSift).

    :param snpeff_path: Specify the path to snpeff
    :param ref_path_gb: Specify the path to the genbank file of a reference genome
    :param locus: Determine the reference genome to use for snpeff
    :param ref_path_fa: Specify the path to the reference genome fasta file
    :param reference_name: Name the reference files in the snpeff config file
    :param output: Create a file that indicates the database is ready
    :return: A text file with a message &quot;database ready!&quot;
    :doc-author: Trelent
    """
    if len(locus) > 1:
        text = [
            f"{reference_name}.chromosome : {' '.join(str(locus))}\n",
            f"{reference_name}.genome : flu\n",
        ]
        for i in locus:
            text.append(
                f"{reference_name}.{i}.codonTable : Bacterial_and_Plant_Plastid\n"
            )
    else:
        text = [
            f"\n{reference_name}.genome: {reference_name}\n",
            f"{reference_name}.{get_id_version(ref_path_gb)}.codonTable : Bacterial_and_Plant_Plastid\n",
        ]
    with open(CONFIG_FILE, "r", encoding="UTF8") as snpeff_config:
        lines = snpeff_config.readlines()
        joined = "".join(lines)
        find = joined.find("".join(text))

    if find == -1:
        subprocess.run(
            f"mkdir  {DATA_DIRECTORY}{reference_name} -p", shell=True, check=False
        )
        subprocess.run(
            f"cat {ref_path_gb} > {DATA_DIRECTORY}{reference_name}/genes.gbk",
            shell=True,
            check=False,
        )
        subprocess.run(
            f"cat {ref_path_fa} > {DATA_DIRECTORY}{reference_name}/sequences.fa",
            shell=True,
            check=False,
        )
        with open(CONFIG_FILE, "a", encoding="UTF8") as snpeff:
            snpeff.writelines(text)

        subprocess.run(
            f"snpEff build -genbank {reference_name} -c {CONFIG_FILE} > {output}",
            shell=True,
            check=False,
        )
    else:
        with open(output, "w", encoding="UTF8") as out:
            out.write("Database Ready!")


if __name__ == "__main__":
    SNPEFF_PATH = sys.argv[1]
    REFERENCE_GB = sys.argv[2]
    REFERENCE_FASTA = sys.argv[3]
    REFERENCE_NAME = sys.argv[4]
    OUTPUT = sys.argv[5]
    SEGMENTS_LIST = get_locus(REFERENCE_GB)
    prepare_snpeff_run(
        REFERENCE_GB, SEGMENTS_LIST, REFERENCE_FASTA, REFERENCE_NAME, OUTPUT
    )
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import os
import sys
from extract_gb_info import get_id_version, get_locus


def prepare_snpeff_run(snpeff_path, ref_path_gb, locus, ref_path_fa, reference_name):
    if len(locus) > 1:
        text = [
            f"{reference_name}.chromosome : {' '.join(str(locus))}\n",
            f"{reference_name}.genome : flu\n",
        ]
        for i in locus:
            text.append(
                f"{reference_name}.{i}.codonTable : Bacterial_and_Plant_Plastid\n"
            )
    else:
        text = [
            f"\n{reference_name}.genome: {reference_name}\n",
            f"{reference_name}.{get_id_version(ref_path_gb)}.codonTable : Bacterial_and_Plant_Plastid\n",
        ]

    with open(
        f"{snpeff_path}/share/snpeff-4.3.1t-5/snpEff.config", "r", encoding="UTF8"
    ) as snpeff_config_file:
        lines = snpeff_config_file.readlines()
        joined = "".join(lines)
        find = joined.find("".join(text))

    if find == -1:
        os.system(
            f"mkdir  {snpeff_path}/share/snpeff-4.3.1t-5/data/{reference_name} ")
        os.system(
            f"cat {ref_path_gb} > {snpeff_path}/share/snpeff-4.3.1t-5/{reference_name}/genes.gbk"
        )
        os.system(
            f"cat {ref_path_fa} > {snpeff_path}/share/snpeff-4.3.1t-5/{reference_name}/sequences.fa"
        )
        with open(
            f"{snpeff_path}/share/snpeff-4.3.1t-5/snpEff.config", "a", encoding="UTF8"
        ) as snpeff:
            snpeff.writelines(text)

        os.system(f"snpEff build -genbank {reference_name}")


if __name__ == "__main__":
    SNPEFF_PATH = sys.argv[1]
    REFERENCE_GB = sys.argv[2]
    REFERENCE_FASTA = sys.argv[3]
    REFERENCE_NAME = sys.argv[4]
    SEGMENTS_LIST = get_locus(REFERENCE_GB)
    prepare_snpeff_run(
        SNPEFF_PATH, REFERENCE_GB, SEGMENTS_LIST, REFERENCE_FASTA, REFERENCE_NAME
    )
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
from Bio import SeqIO


def get_locus(genbank_file):
    """
    The get_locus function takes a genbank file and returns the locus number of that record.
    If there is only one record in the genbank file, it will return the possible_name.
    If there are multiple records, it will return a list of all locus numbers.

    :param genbank_file: Open the genbank file, and then parse it using seqio
    :param possible_name: Determine if the file is a single genbank file or not
    :return: A list of locus numbers
    :doc-author: Trelent
    """
    locus = []
    with open(genbank_file, encoding="utf-8") as handle_gb:
        for record in SeqIO.parse(handle_gb, "genbank"):
            locus.append(record.name)
    return locus


def get_locus_len(genbank_file):
    """
    The get_locus function takes a genbank file and returns the locus number of that record.
    If there is only one record in the genbank file, it will return the possible_name.
    If there are multiple records, it will return a list of all locus numbers.

    :param genbank_file: Open the genbank file, and then parse it using seqio
    :param possible_name: Determine if the file is a single genbank file or not
    :return: A list of locus numbers
    :doc-author: Trelents
    """
    locus = []
    with open(genbank_file, encoding="utf-8") as handle_gb:
        for record in SeqIO.parse(handle_gb, "genbank"):
            locus.append(len(record.seq))
    return locus


def get_locus_and_genes(genbank_file):
    """
    The get_locus_and_genes function takes a genbank file as input and returns a dictionary of
    locus tags and genes.
    The function iterates through the features in each record, checking if they are CDSs. If so,
    it checks if the gene qualifier is present or not. If it is present, then that value is used
    for the gene name; otherwise, the locus_tag
    is used instead.

    :param genbank_file:str: Specify the file that contains the genbank information
    :return: A dictionary with the locus tag as key and a list of gene names as value
    :doc-author: Trelent
    """

    locus_gene = {}
    with open(genbank_file, encoding="utf-8") as handle_gb:
        for record in SeqIO.parse(handle_gb, "genbank"):
            locus_gene[record.name] = []
            for feat in record.features:
                if feat.type == "CDS":
                    if feat.qualifiers.get("locus_tag", False):
                        locus_gene[record.name].append(
                            feat.qualifiers["locus_tag"][0].replace(" ", "_")
                        )
                    elif feat.qualifiers.get("gene", False):
                        locus_gene[record.name].append(
                            feat.qualifiers["gene"][0].replace(" ", "_")
                        )
                    elif feat.qualifiers.get("note", False):
                        locus_gene[record.name].append(
                            feat.qualifiers["note"][0].replace(" ", "_")
                        )
                    elif feat.qualifiers.get("product", False):
                        locus_gene[record.name].append(
                            feat.qualifiers["product"][0].replace(" ", "_")
                        )
    return locus_gene


def get_gb_name(genbank_file):
    with open(genbank_file, encoding="utf-8") as handle_gb:
        gbk = list(SeqIO.parse(handle_gb, "genbank"))[0]
        return gbk.name


def get_id_version(genbank_file):
    """
    The get_locus function takes a genbank file and returns the locus number of that record.
    If there is only one record in the genbank file, it will return the possible_name.
    If there are multiple records, it will return a list of all locus numbers.

    :param genbank_file: Open the genbank file, and then parse it using seqio
    :param possible_name: Determine if the file is a single genbank file or not
    :return: A list of locus numbers
    :doc-author: Trelent
    """
    with open(genbank_file, encoding="utf-8") as handle_gb:
        for line in handle_gb:
            if "VERSION" in line:
                return line.split(" ")[-1].strip()
    return ""


def get_positions_gb(genbank_file):
    """
    The get_positions_gb function takes a genbank file as input and returns a list of lists.
    Each sublist contains the name of the gene, and its start and end positions in that particular
    sequence.
    The get_positions_gb function is called by other functions to retrieve this information.

    :param genbank_file: Open the genbank file and parse it
    :return: A list of lists
    :doc-author: Trelent
    """
    positions = []
    with open(genbank_file, encoding="utf-8") as handle_gb:
        for record in SeqIO.parse(handle_gb, "genbank"):
            for feat in record.features:
                if feat.type == "CDS":
                    if feat.qualifiers.get("locus_tag", False):
                        positions.append(
                            [
                                feat.qualifiers.get("locus_tag")[0].replace(" ", "_"),
                                feat.location,
                            ]
                        )
                    elif feat.qualifiers.get("gene", False):
                        positions.append(
                            [
                                feat.qualifiers["gene"][0].replace(" ", "_"),
                                feat.location,
                            ]
                        )
                    elif feat.qualifiers.get("note", False):
                        positions.append(
                            [
                                feat.qualifiers["note"][0].replace(" ", "_"),
                                feat.location,
                            ]
                        )
                    elif feat.qualifiers.get("product", False):
                        positions.append(
                            [
                                feat.qualifiers["product"][0].replace(" ", "_"),
                                feat.location,
                            ]
                        )
    positions_clean = []

    for idx, gene in enumerate(positions):
        positions_clean.append((gene[0], []))
        for part in gene[1].parts:
            positions_clean[idx][1].append([int(part.start), int(part.end)])
    handle_gb.close()
    return positions_clean


def get_genes(genbank_file):
    """
    The get_genes function takes a genbank file as input and returns a list of
    all the genes in that file.
    The function is used to create a list of all the genes in each genome.

    :param genbank_file: Specify the name of the file that contains all of
    the genes in a genome
    :return: A list of all the genes in the genbank file
    :doc-author: Trelent
    """
    genes = []
    with open(genbank_file, encoding="utf-8") as handle_gb:
        for record in SeqIO.parse(handle_gb, "genbank"):
            for features in record.features:
                if features.type == "CDS":
                    if features.qualifiers.get("gene"):
                        genes.append(features.qualifiers["gene"][0].replace(" ", "_"))
                    elif features.qualifiers.get("locus_tag"):
                        genes.append(
                            features.qualifiers["locus_tag"][0].replace(" ", "_")
                        )
                    elif features.qualifiers.get("note"):
                        genes.append(features.qualifiers["note"][0].replace(" ", "_"))
                    elif features.qualifiers.get("product"):
                        genes.append(
                            features.qualifiers["product"][0].replace(" ", "_")
                        )
        handle_gb.close()
    return genes


def get_identification_version(segments, reference_gb):
    """
    The get_identification_version function takes a list of SeqRecord objects
    and the path to a reference GenBank file.
    It returns the identification and version numbers for that GenBank file.

    :param segments:list: Determine if the reference is a single segment or not
    :param reference_gb:str: Specify the reference genome in genbank format
    :return: A tuple with the identification and version of the reference
    genome
    :doc-author: Trelent
    """
    if len(segments) == 1:
        version_id = get_id_version(reference_gb).split(".")
        identification = version_id[0] if len(version_id) > 0 else ""
        version = version_id[1] if len(version_id) > 1 else ""
        if len(version_id) == 1:
            new_id = get_gb_name(reference_gb)
            return new_id, ""
    else:
        identification = ""
        version = ""
    return identification, version


def get_identification_version_string(segments, reference_gb):
    """
    The get_identification_version function takes a list of SeqRecord objects
    and the path to a reference GenBank file.
    It returns the identification and version numbers for that GenBank file.

    :param segments:list: Determine if the reference is a single segment or not
    :param reference_gb:str: Specify the reference genome in genbank format
    :return: A tuple with the identification and version of the reference
    genome
    :doc-author: Trelent
    """
    if len(segments) == 1:
        version_id = get_id_version(reference_gb).split(".")
        identification = version_id[0] if len(version_id) > 0 else ""
        version = version_id[1] if len(version_id) > 1 else ""
        if len(version_id) == 1:
            new_id = get_gb_name(reference_gb)
            return new_id
    else:
        identification = ""
        version = ""
    return f"{identification}.{version}"
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
import sys
import csv
from yaml_io import read_yaml
from Bio import SeqIO
from extract_gb_info import get_locus


def get_locus_out_id(string_id: str):
    return string_id[len(string_id) - string_id[::-1].index("__"):]


def convert_index_to_locus(coverage: dict[str, list[int]], genbank: str) -> dict[str, list[str]]:
    locus_names: list[str] = get_locus(genbank)

    for sample in coverage:
        for idx, value in enumerate(coverage[sample]):
            coverage[sample][idx] = locus_names[value-1]
    return coverage


def generate_AllConsensus(
    coverage_file,
    reference_gb,
    input_files,
    reference_fasta,
    output_w_ref,
    output_no_ref,
):
    """
    The create_consensus_file_for_alignment function takes a list of files, the reference genome in genbank format,
    the species name and an output file name as input. It then creates a fasta file with all the sequences from the
    consensus files that have coverage above 90%. The function also takes into account if it is flu or not. If it is flu
    it will only include samples that have 8 loci covered at 90% or more in one file and in the other it will concatonate all
    segments that have 90% coverage or more.

    :param consensus: Specify the path to the directory containing all of your consensus sequences
    :param reference_gb: Get the name of the reference sequence in a genbank file
    :param species: Determine which reference file to use
    :param output: Name the output file
    :param coverage_file: Specify the file containing all the coverage values for each sample
    :param fasta_file: Get the reference sequence from the fasta file
    :param output_only_90_plus: Create a fasta file with only the sequences that have at least 90% coverage
    :return: A fasta file with the consensus sequences of all samples that have a coverage over 90%
    :doc-author: Trelent
    """
    with open(coverage_file, newline="") as csvfile:
        csv_reader = csv.reader(csvfile, delimiter=",")
        coverage_list = list(csv_reader)

    dic_directory = read_yaml("../config/constants.yaml")

    software_parameters = read_yaml(dic_directory["software_parameters"])

    coverage_value = software_parameters["min_coverage_consensus"]

    for row in coverage_list:
        for value in range(len(row) - 1, 0, -1):
            if float(row[value]) >= coverage_value:
                row[value] = value
            else:
                row.pop(value)
    coverage_dic = {}
    for i in coverage_list:
        coverage_dic[i[0]] = i[1:]

    coverage_dic = convert_index_to_locus(coverage_dic, reference_gb)

    final_consensus_w_ref = []
    final_consensus_no_ref = []
    for record in SeqIO.parse(reference_fasta, "fasta"):
        final_consensus_w_ref.append(record)
    for sample in coverage_dic:
        for file in input_files:
            if sample in file:
                record: SeqIO.SeqRecord
                for record in SeqIO.parse(file, "fasta"):
                    if get_locus_out_id(record.id) in coverage_dic[sample]:
                        final_consensus_w_ref.append(record)
                        final_consensus_no_ref.append(record)

    SeqIO.write(final_consensus_w_ref, output_w_ref, "fasta")
    SeqIO.write(final_consensus_no_ref, output_no_ref, "fasta")


if __name__ == "__main__":
    coverage_file = sys.argv[1]
    reference_gb = sys.argv[2]
    input_files = sys.argv[3].split(" ")
    reference_fasta = sys.argv[4]
    output_w_ref = sys.argv[5]
    output_no_ref = sys.argv[6]
    generate_AllConsensus(
        coverage_file,
        reference_gb,
        input_files,
        reference_fasta,
        output_w_ref,
        output_no_ref,
    )
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import csv
import re
from collections import Counter
import sys


def get_abricate_info(path):
    """
    The get_abricate_info function takes in a path to an abricate file and returns
    the most common species and genus found in that file. The function first opens
    the csv file, reads it into a list of lists, then iterates through each entry
    to find all entries with "genus" or "species" as their header. If they have
    "genus", then they are added to a list of genera. If they have "species", then
    they are added to a list of species. After this is done for every
    entry, we use Counter from collections (imported at top)

    :param path:str: Specify the path to the abricate file
    :return: A list of two strings: the most common species and genus found in a given abricate file
    """

    with open(path, encoding="utf-8") as csv_handler:
        reader = csv.reader(csv_handler)
        info_list = list(reader)

    genus_list = []
    species_list = []

    for entry_list in info_list:
        entry = entry_list[0]
        if "genus" in entry:
            genus = re.findall("(?<=genus~~~)(.*?)(?=~~~)", entry)[0]
            genus_list.append(genus)

        elif "species" in entry:
            species = re.findall("(?<=species~~~)(.*?)(?=~~~)", entry)[0]
            species_list.append(species)
    if genus_list and species_list:
        final_genus = set(genus_list)
        final_species = set(species_list)
        return [final_species, final_genus]
    elif genus_list and not species_list:
        final_genus = set(genus_list)
        return ["", final_genus]
    elif not genus_list and species_list:
        final_species = set(species_list)
        return [final_species, ""]
    else:
        return ["", ""]


def write_to_file(output_path, info_list) -> None:
    """
    The write_to_file function writes the species and genus names to a file.
    The function takes two arguments: output_path, which is the path of the file we want to write
    to;
    and info_list, which is a list containing both species and genus names. The function returns
    None.

    :param output_path:str: Specify the path to the output file
    :param info_list:list[str]: Store the information that will be written to the file
    :return: None
    """
    species = info_list[0]
    genus = info_list[1]

    with open(output_path, "w", encoding="utf-8") as output_handler:
        output_handler.write(f"Species: {species}\n")
        output_handler.write(f"Genus: {genus}")


if __name__ == "__main__":
    input_file = sys.argv[1]
    output_file = sys.argv[2]
    write_to_file(output_file, get_abricate_info(input_file))
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import csv
import re
from collections import Counter
import sys


def get_abricate_info(path):
    """
    The get_abricate_info function takes in a path to an abricate file and returns
    the most common species and genus found in that file. The function first opens
    the csv file, reads it into a list of lists, then iterates through each entry
    to find all entries with "genus" or "species" as their header. If they have
    "genus", then they are added to a list of genera. If they have "species", then
    they are added to a list of species. After this is done for every
    entry, we use Counter from collections (imported at top)

    :param path:str: Specify the path to the abricate file
    :return: A list of two strings: the most common species and genus found in a given abricate file
    """

    with open(path, encoding="utf-8") as csv_handler:
        reader = csv.reader(csv_handler)
        info_list = list(reader)

    genus_list = []
    species_list = []

    for entry_list in info_list:
        entry = entry_list[0]
        if "genus" in entry:
            genus = re.findall("(?<=insaflu~~~)(.*?)(?=_genus)", entry)[0]
            genus_list.append(genus)

        elif "species" in entry:
            species = re.findall("(?<=insaflu~~~)(.*?)(?=_species)", entry)[0]
            species_list.append(species)
    if genus_list and species_list:
        final_genus = max(Counter(genus_list).items(), key=lambda x: x[1])[0]
        final_species = max(Counter(species_list).items(), key=lambda x: x[1])[0]
        return [final_species, final_genus]
    elif genus_list and not species_list:
        final_genus = max(Counter(genus_list).items(), key=lambda x: x[1])[0]
        return ["", final_genus]
    elif not genus_list and species_list:
        final_species = max(Counter(species_list).items(), key=lambda x: x[1])[0]
        return [final_species, ""]
    else:
        return ["", ""]


def write_to_file(output_path, info_list) -> None:
    """
    The write_to_file function writes the species and genus names to a file.
    The function takes two arguments: output_path, which is the path of the file we want to write
    to;
    and info_list, which is a list containing both species and genus names. The function returns
    None.

    :param output_path:str: Specify the path to the output file
    :param info_list:list[str]: Store the information that will be written to the file
    :return: None
    """
    species = info_list[0]
    genus = info_list[1]

    with open(output_path, "w", encoding="utf-8") as output_handler:
        output_handler.write(f"Species: {species}\n")
        output_handler.write(f"Genus: {genus}")


if __name__ == "__main__":
    input_file = sys.argv[1]
    output_file = sys.argv[2]
    write_to_file(output_file, get_abricate_info(input_file))
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import sys
import re
from Bio import SeqIO

if __name__ == "__main__":
    consensus_files = sys.argv[1].split(" ")
    output_file = sys.argv[2]

    final_consensus_file = []
    for consensus_file in consensus_files:
        aligned_sequence = list(SeqIO.parse(consensus_file, "fasta"))[1]
        aligned_sequence.id = f"{re.findall('(?<=align_samples/)(.*?)(?=/)',consensus_file)[0]}__{aligned_sequence.id}"
        final_consensus_file.append(aligned_sequence)
    with open(output_file, "w", encoding="utf-8") as handle_fasta_out_align:
        SeqIO.write(final_consensus_file, handle_fasta_out_align, "fasta")
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
from Bio import SeqIO
from optparse import OptionParser
import sys
import random
import os
import gzip
import glob


def get_locus_name_len(genbank_file):
    locus_name = []
    locus_len = []
    with open(genbank_file, encoding="utf-8") as handle_gb:
        for record in SeqIO.parse(handle_gb, "genbank"):
            locus_name.append(record.name)
            locus_len.append(str(len(record.seq)))
    return locus_name, locus_len


class Util(object):
    '''
    classdocs
    '''
    FORMAT_FASTA = "fasta"
    FORMAT_FASTQ = "fastq"
    EXTENSION_ZIP = ".gz"
    TEMP_DIRECTORY = "/tmp"
    COVERAGE_TEMP_DIRECTORY = "getCoverage"
    COUNT_DNA_TEMP_DIRECTORY = "count_dna_temp_directory"

    def __init__(self):
        '''
        Constructor
        '''
        pass

    def get_temp_file(self, file_name, sz_type):
        main_path = os.path.join(
            self.TEMP_DIRECTORY, self.COUNT_DNA_TEMP_DIRECTORY)
        if (not os.path.exists(main_path)):
            os.makedirs(main_path)
        while 1:
            return_file = os.path.join(main_path, "count_dna_" + file_name + "_" + str(
                random.randrange(100000, 999999, 10)) + "_file" + sz_type)
            if (not os.path.exists(return_file)):
                return return_file

    def is_integer(self, n_value):
        try:
            int(n_value)
            return True
        except ValueError:
            return False

    def is_gzip(self, file_name): return True if (
        file_name.rfind(".gz") == len(file_name) - 3) else False

    def __get_temp_file__(self, index_file_to_process, sz_type):
        main_path = os.path.join(
            self.TEMP_DIRECTORY, self.COVERAGE_TEMP_DIRECTORY)
        if (not os.path.exists(main_path)):
            os.makedirs(main_path)
        while 1:
            return_file = os.path.join(main_path, "seq_dna_" + str(index_file_to_process) + "_" + str(
                random.randrange(10000, 99999, 10)) + "_file." + sz_type)
            if (not os.path.exists(return_file)):
                return return_file


class CoverageElement(object):
    """
    Only have the number of reads and average
    """

    def __init__(self, element):
        self.element = element
        self.dt_data = {}

    def add_coverage(self, type_coverage, coverage):
        self.dt_data[type_coverage] = coverage

    def get_coverage(self, type_coverage):
        return self.dt_data.get(type_coverage, None)

    def __str__(self):
        return "Element: {}  {}".format(self.element, self.dt_data)


class Coverage(object):
    """
    Only have the number of reads and average
    """
    COVERAGE_ALL = "CoverageAll"
    COVERAGE_MORE_DEFINED_BY_USER = "CoverageMoreDefinedByUser"

    def __init__(self, genbank, limit_defined_by_user=10):
        self.limit_defined_by_user = limit_defined_by_user
        self.dt_data = {}
        self.ratio_value_defined_by_user = 0
        self.genbank = genbank

    def get_dict_data(self): return self.dt_data

    def add_coverage(self, element, type_coverage, coverage):
        if (element in self.dt_data):
            self.dt_data[element].add_coverage(type_coverage, coverage)
        else:
            self.dt_data[element] = CoverageElement(element)
            self.dt_data[element].add_coverage(type_coverage, coverage)

    def get_coverage(self, element, type_coverage):
        if (element in self.dt_data):
            return self.dt_data[element].get_coverage(type_coverage)
        raise Exception("Error: there's no key like this: " + element)

    def __str__(self):
        sz_return = "snps"
        names, _ = get_locus_name_len(self.genbank)
        for name in names:
            sz_return += f"\t{name}"

        sz_return += "\t"

        for key in self.dt_data:
            sz_return += "{}\t".format(
                self.get_coverage(
                    key, Coverage.COVERAGE_ALL))
            print(sz_return)

        for key in self.dt_data:
            sz_return += "{}\t".format(
                self.get_coverage(
                    key, Coverage.COVERAGE_MORE_DEFINED_BY_USER))

        return sz_return


class ParseFile(object):
    '''
    classdocs
    '''

    def __init__(self):
        '''
        Constructor
        '''
        self.data_file = None
        self.reference_dict = {}
        self.vect_reference = []
        self.util = Util()

    def is_gzip(self, file_name): return True if (
        file_name.rfind(".gz") == len(file_name) - 3) else False

    def parse_file(self, file_name):
        """
        """
        self.data_file = DataFile(file_name)
        if (self.util.is_gzip(file_name)):
            handle = gzip.open(file_name, mode='rt')
        else:
            handle = open(file_name)
        for line in handle:
            sz_temp = line.strip().lower()
            if (len(sz_temp) == 0 or sz_temp[0] == '#'):
                continue
            self.data_file.add_data(line)
        handle.close()
        return self.data_file

    def read_reference_fasta(self, reference_file):
        """
        test if the reference_file and ge the handle
        """
        if (not os.path.exists(reference_file)):
            raise Exception(
                "Can't locate the reference file: '" + reference_file + "'")

        # set temp file name
        temp_file_name = reference_file

        # create temp file
        b_temp_file = False
        if self.util.is_gzip(reference_file):
            b_temp_file = True
            temp_file_name = self.util.get_temp_file(
                "reference_file_", ".fasta")
            cmd = "gzip -cd " + reference_file + " > " + temp_file_name
            os.system(cmd)

        for rec in SeqIO.parse(temp_file_name, 'fasta'):
            self.reference_dict[rec.id] = len(str(rec.seq))
            self.vect_reference.append(rec.id)

        # remove temp file if necessary
        if b_temp_file and temp_file_name:
            os.remove(temp_file_name)


class DataFile(object):
    '''
    classdocs
    '''
    util = Util()

    def __init__(self, file_name):
        '''
        Constructor
        '''
        self.file_name = file_name
        self.vect_chromosomes = []
        self.dict_data = {}
        self.dict_data_coverage = {}
        self.previous_position = -1

    def get_vect_chromosomes(self): return self.vect_chromosomes
    def get_dict_data(self): return self.dict_data

    def add_data(self, line):
        if (len(line) == 0 or line[0] == '#'):
            return
        vect_data = line.split()
        if (len(vect_data) != 3):
            raise Exception("File: " + self.file_name +
                            "\nThis line must have three values '" + line + "'")
        if (not self.util.is_integer(vect_data[1])):
            raise Exception("File: " + self.file_name + "\nLine: '" +
                            line + "'\nThe locus need to be integer")
        if (not self.util.is_integer(vect_data[2])):
            raise Exception("File: " + self.file_name + "\nLine: '" +
                            line + "'\nThe coverage need to be integer")
        if (vect_data[0] in self.dict_data):
            if (int(vect_data[1]) <= (self.previous_position)):
                raise Exception("File: " + self.file_name + "\nLine: '" + line +
                                "'\nThe locus need to be greater than the predecessor in the file")
            self.dict_data[vect_data[0]].append([vect_data[1], vect_data[2]])
            self.previous_position = int(vect_data[1])
        else:
            self.vect_chromosomes.append(vect_data[0])
            self.dict_data[vect_data[0]] = [[vect_data[1], vect_data[2]]]
            self.previous_position = int(vect_data[1])

    def get_coverage(self, sz_chromosome, length_chromosome):
        if (sz_chromosome not in self.dict_data):
            return 0
        if (sz_chromosome in self.dict_data_coverage):
            return self.dict_data_coverage[sz_chromosome]
        if (length_chromosome == 0):
            return 0
        # medaka sometimes creates bigger references than the original, difference 2 or 3 number of bases
        if (len(self.dict_data[sz_chromosome]) > (length_chromosome * 1.10) or
                len(self.dict_data[sz_chromosome]) < (length_chromosome - (length_chromosome * 0.10))):
            raise Exception("Chromosome '%s' has different sizes. Coverage: %d; Reference: %d" % (
                sz_chromosome, len(self.dict_data[sz_chromosome]), length_chromosome))
        sum_total = 0
        for data_ in self.dict_data[sz_chromosome]:
            sum_total += int(data_[1])
        self.dict_data_coverage[sz_chromosome] = sum_total / \
            float(length_chromosome)
        return self.dict_data_coverage[sz_chromosome]

    def get_ratio_more_than(self, sz_chromosome, length_chromosome, value):
        if (sz_chromosome not in self.dict_data):
            return 0
        if (length_chromosome == 0):
            return 0
        # medaka sometimes creates bigger references than the original, difference 2 or 3 number of bases
        if (len(self.dict_data[sz_chromosome]) > (length_chromosome * 1.10) or
                len(self.dict_data[sz_chromosome]) < (length_chromosome - (length_chromosome * 0.10))):
            raise Exception("Chromosome '%s' has different sizes. Coverage: %d; Reference: %d" % (
                sz_chromosome, len(self.dict_data[sz_chromosome]), length_chromosome))
        sum_total = 0
        for data_ in self.dict_data[sz_chromosome]:
            sum_total += (1 if (int(data_[1]) > value) else 0)
        return sum_total / float(length_chromosome)


class GetCoverage(object):
    """
    get coverage from deep.gz file
    need deep.gz file and reference
    """
    util = Util()

    def __init__(self):
        self.vect_files_processed = []
        self.reference_dict = {}
        self.vect_reference = []

    def get_dict_reference(self): return self.reference_dict
    def get_vect_reference(self): return self.vect_reference
    def get_vect_files_processed(self): return self.vect_files_processed

    def process_input_files(self, input_path):
        """
                test if input_file is directory or file and read all files in the directory
        """

        print("Collecting files in: " + input_path)
        if (os.path.isfile(input_path)):
            if (os.path.exists(input_path)):
                self.vect_files_processed.append(input_path)
        else:
            self.vect_files_processed = glob.glob(input_path)

    def get_dict_with_coverage(self, deep_file):
        """
        get a dictonary of elements with coverage
        """
        self.reference_dict = {}
        self.vect_reference = []

        parse_file = ParseFile()
        data_file = parse_file.parse_file(deep_file)
        dt_out = {}
        for key in data_file.get_dict_data().keys():
            dt_out[key] = [int(value_[1])
                           for value_ in data_file.get_dict_data()[key]]
        return dt_out

    # def get_coverage(self, deep_file, reference, limit_defined_by_user = None, limit_defined_to_project = None):
    def get_coverage(self, deep_file, reference, limit_defined_by_user=10):
        """
        get an instance of coverage
        """
        self.reference_dict = {}
        self.vect_reference = []

        parse_file = ParseFile()
        data_file = parse_file.parse_file(deep_file)
        self.read_reference_fasta(reference)

        coverage = Coverage(genbank=options.genbank)
        for chromosome in self.vect_reference:
            if (chromosome not in self.reference_dict):
                raise Exception("Can't locate the chromosome '" +
                                chromosome + "' in reference file")
            coverage.add_coverage(chromosome, Coverage.COVERAGE_ALL, "%.1f" % (
                data_file.get_coverage(chromosome, self.reference_dict[chromosome])))
            coverage.add_coverage(chromosome, Coverage.COVERAGE_MORE_DEFINED_BY_USER,
                                  "%.1f" % (data_file.get_ratio_more_than(chromosome, self.reference_dict[chromosome], limit_defined_by_user - 1) * 100))
        return coverage

    def read_reference_fasta(self, reference_file):
        """
        test if the reference_file and ge the handle
        """
        if (not os.path.exists(reference_file)):
            raise Exception(
                "Can't locate the reference file: '" + reference_file + "'")

        # set temp file name
        temp_file_name = reference_file

        # create temp file
        b_temp_file = False
        if self.util.is_gzip(reference_file):
            b_temp_file = True
            temp_file_name = self.util.get_temp_file(
                "reference_file_", ".fasta")
            cmd = "gzip -cd " + reference_file + " > " + temp_file_name
            os.system(cmd)

        for rec in SeqIO.parse(temp_file_name, 'fasta'):
            self.reference_dict[rec.id] = len(str(rec.seq))
            self.vect_reference.append(rec.id)

        ###
        if b_temp_file and temp_file_name:
            os.remove(temp_file_name)


if __name__ == '__main__':

    """
    V0.6 release 21/11/2017
            add - 	ratio as a parameter
    V0.5 release 30/09/2017
            Fix - 	length chromosome
    V0.4 release 30/09/2017
            Fix - 	when coverage doesn't have at all coverage doesn't appear in the results
    V0.3 release 30/09/2017
            Fix - 	get coverage 0 for chromosomes that are missing in coverage file
    V0.2 release 30/09/2017
            Add - 	need the reference file
                            check if all chromosomes that are in the coverage are the ones that are in the reference.
                            get the size of the chromosomes from the reference
                            add chromosome length to the report
    V0.1 release 12/09/2017
            Add - coverage average base on the files <chromosome> <position> <deep coverage>:
    """

    b_debug = False
    if (b_debug):
        dir_path = os.path.dirname(os.path.realpath(__file__))
        input_file = os.path.join(dir_path, "test/files/*.depth")
        output_file = "tmp_out_get_coverage.csv"
        reference = os.path.join(dir_path, "test/files/ref/ref_H3.fasta")
        threshold = 10
        get_coverage = GetCoverage()
        get_coverage.process_input_files(input_file)
        handle = open(output_file, "w")
        for file_to_process in get_coverage.get_vect_files_processed():
            coverage = get_coverage.get_coverage(deep_file=file_to_process,
                                                 reference=reference, limit_defined_by_user=threshold)
            print(coverage)
            handle.write("{},{}\n".format(

                str(os.path.basename(file_to_process)).split(".")[0], coverage))
            # print("{},{}\n".format(str(os.path.basename(file_to_process)).split(".")[0],coverage))
        handle.close()
        sys.exit(1)
    else:
        parser = OptionParser(
            usage="%prog [-h] -i -r -o [-t]", version="%prog 0.6", add_help_option=False)
        parser.add_option("-i", "--input", type="string", dest="input",
                          help="Input file or path with coverage files. Can be zipped.", metavar="IN_FILE")
        parser.add_option("-r", "--reference", type="string", dest="reference",
                          help="Reference file to get the length of the chromosomes to check his name.", metavar="REF_FILE")
        parser.add_option("-g", "--genbank", type="string", dest="genbank",
                          help="Genbank file")

        parser.add_option("-o", "--output", type="string",
                          dest="output", help="Output file name", metavar="OUT_FILE")
        parser.add_option("-t", "--threshold", type="int", dest="threshold",
                          help="Minimum coverage per position (default 10)", metavar="OUT_FILE")
        parser.add_option('-h', '--help', dest='help',
                          action='store_true', help='show this help message and exit')

        (options, args) = parser.parse_args()

        if (options.help):
            parser.print_help()
            print("")
            print("\tCreate an output file with several averages about the coverage.")
            print("\tOnly runs in linux or mac.")
            print(
                "\texample: python getCoverage -i '/usr/local/zpto/*.gz' -r reference.fasta -o resultsOut.csv")
            print("\texample: python getCoverage -i '/usr/local/zpto/*.gz' -r reference.fasta -o resultsOut.csv -t 30")
            print("")
            print(
                "\tThe input coverage files must be in this format '<chromosome> <position> <deep coverage>'")
            sys.exit(0)

        if (len(args) != 0):
            parser.error(
                "incorrect number of arguments, no of arguments: " + str(len(args)))

        if not options.input:   #
            parser.error('Name of the input files/path is not specified')

        if not options.genbank:   #
            parser.error('Name of the input files/path is not specified')

        if not options.reference:   #
            parser.error('Name of the reference file not specified')

        if not options.output:   #
            parser.error('Output file is not specified')

        threshold = 10
        if (options.threshold):
            threshold = options.threshold
        locus_name, locus_len = get_locus_name_len(options.genbank)
        get_coverage = GetCoverage()
        get_coverage.process_input_files(options.input)
        handle = open(options.output, "w")
        # print(get_coverage.get_vect_files_processed())
        for file_to_process in get_coverage.get_vect_files_processed():
            coverage = get_coverage.get_coverage(deep_file=file_to_process,
                                                 reference=options.reference,
                                                 limit_defined_by_user=threshold)
            print(file_to_process)
            print(str(os.path.basename(file_to_process)).split(".")[0])
            handle.write("\n")
            handle.write("Chromosome\n")
            print(locus_name, locus_len)
            name = "Name\t" + "\t".join(locus_name)
            handle.write(name.strip() + "\n")
            lenght = "Lenght\t" + "\t".join(locus_len)
            handle.write(lenght.strip() + "\n")
            handle.write("\n")
            handle.write("Coverage\tRatio>0\tRatio>9\n")
            handle.write("{}\n".format(
                str(coverage).strip()))

            # print("{},{}".format(str(os.path.basename(file_to_process)).split(".")[0],coverage))
            handle.close()
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def get_trimmomatic_parameters(software_parameters):
    trimmomatic_params = (
        f'ILLUMINACLIP {software_parameters["trimmo-ILLUMINACLIP"]} -phred33'
        if software_parameters["trimmo-ILLUMINACLIP"] is not None
        else "-phred33"
    )
    trimmomatic_params += (
        f' HEADCROP:{software_parameters["trimmo-HEADCROP"]}'
        if software_parameters["trimmo-HEADCROP"] is not None
        else ""
    )
    trimmomatic_params += (
        f' CROP:{software_parameters["trimmo-CROP"]}'
        if software_parameters["trimmo-CROP"] is not None
        else ""
    )
    trimmomatic_params += (
        f' SLIDINGWINDOW:{software_parameters["trimmo-SLIDINGWINDOW1"]}:{software_parameters["trimmo-SLIDINGWINDOW2"]}'
        if software_parameters["trimmo-SLIDINGWINDOW2"] is not None
        else ""
    )
    trimmomatic_params += (
        f' LEADING:{software_parameters["trimmo-LEADING"]}'
        if software_parameters["trimmo-LEADING"] is not None
        else ""
    )
    trimmomatic_params += (
        f' TRAILING:{software_parameters["trimmo-TRAILING"]}'
        if software_parameters["trimmo-TRAILING"] is not None
        else ""
    )
    trimmomatic_params += (
        f' MINLEN:{software_parameters["trimmo-MINLEN"]}'
        if software_parameters["trimmo-MINLEN"] is not None
        else ""
    )
    trimmomatic_params += " TOPHRED33"
    return trimmomatic_params


def mask_regions_parameters(software_parameters):
    mask = (
        f'-s {software_parameters["MASK_SITES"]} '
        if software_parameters["MASK_SITES"] is not None
        else ""
    )
    mask += (
        f'-r {software_parameters["MASK_REGIONS"]} '
        if software_parameters["MASK_REGIONS"] is not None
        else ""
    )
    mask += (
        f'-b {software_parameters["MASK_F_BEGINNING"]} '
        if software_parameters["MASK_F_BEGINNING"] is not None
        else ""
    )
    mask += (
        f'-e {software_parameters["MASK_F_END"]} '
        if software_parameters["MASK_F_END"] is not None
        else ""
    )
    return mask


def get_snippy_parameters(software_parameters):
    snippy_parameters = (
        f' --mapqual {software_parameters["mapqual"]}'
        if software_parameters["mapqual"]
        else ""
    )
    snippy_parameters += (
        f' --mincov {software_parameters["mincov"]}'
        if software_parameters["mincov"]
        else ""
    )
    snippy_parameters += (
        f' --minfrac {software_parameters["minfrac"] }'
        if software_parameters["minfrac"]
        else ""
    )
    return snippy_parameters


def get_nanofilt_parameters(software_parameters):
    nanofilt_parameters = (
        f' -q {software_parameters["QUALITY"]}'
        if software_parameters["QUALITY"] is not None
        else ""
    )
    nanofilt_parameters += (
        f' -l {software_parameters["LENGTH"]}'
        if software_parameters["LENGTH"] is not None
        else ""
    )
    nanofilt_parameters += (
        f' --headcrop {software_parameters["HEADCROP"]}'
        if software_parameters["HEADCROP"] is not None
        else ""
    )
    nanofilt_parameters += (
        f' --tailcrop {software_parameters["TAILCROP"]}'
        if software_parameters["TAILCROP"] is not None
        else ""
    )
    return nanofilt_parameters
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
from typing import Optional
import pandas
from scripts.yaml_io import read_yaml

dic_directory = read_yaml("config/constants.yaml")


class Data:
    """Object with sample data information"""

    def __init__(self, file: str, consensus_illumina) -> None:
        self.user_df: pandas.DataFrame = pandas.read_csv(file)
        self.consensus_illumina = consensus_illumina

    def get_sample_names(self) -> list[str]:
        """
        The get_sample_names function returns a list of sample names from the
        user_df dataframe.


        :param self: Access the attributes and methods of the class in python
        :return: A list of the sample names in the user_df
        :doc-author: Trelent
        """
        return list(self.user_df["sample_name"])

    def get_sample_1(self) -> list[str]:
        """
        The get_sample_1 function returns a list of the fastq files associated with
        the sample.  This is accomplished by returning the values in the &quot;fastq&quot; column
        of a pandas dataframe.

        :param self: Access the attributes and methods of the class in python
        :return: A list of the fastq file names from the user_df dataframe
        :doc-author: Trelent
        """
        return list(self.user_df["fastq1"])

    def get_sample_2(self) -> list[Optional[str]]:
        """
        The get_sample_2 function returns a list of the second fastq file for each sample.
        This is useful when you want to run a program that requires paired-end reads.

        :param self: Access the attributes and methods of the class in python
        :return: A list of the values in the column &quot;fastq2&quot; from the user_df
        :doc-author: Trelent
        """
        return list(self.user_df["fastq2"])

    def get_sample_type(self) -> dict[str, str]:
        """
        The get_sample_type function returns a dictionary of sample names and their corresponding
        sequencing technology. The function is called by the get_sample_metadata function to create
        a dictionary of metadata for each sample.

        :param self: Reference the class instance
        :return: A dictionary with sample names as keys and the corresponding assembler type as values
        :doc-author: Trelent
        """
        names = self.get_sample_names()
        tech_type: pandas.Series = self.user_df["tech"]
        type_dic: dict[str, str] = {}
        for index, _ in enumerate(names):
            type_dic[names[index]] = (
                "medaka" if tech_type[index] == "ont" else self.consensus_illumina
            )
        return type_dic

    def get_dic(self) -> dict[str, dict[str, Optional[str]]]:
        """
        The get_dic function creates a dictionary of dictionaries. The outer dictionary is keyed
        by sample names, and the inner
        dictionary contains keys for each column in the user_df dataframe. For example:
        {'sample_name': {'fastq2': 'path/to/file', 'fastq2': 'path/to/file', ...}, ...}

        :param self: Refer to the object that is calling the function
        :return: A dictionary with the sample names as keys and a subdictionary of fastq files,
        tech, and species
        :doc-author: Trelent
        """
        dic: dict = {}
        names = self.get_sample_names()
        for name in names:
            dic[name] = {}

        for idx, sample_name_1 in enumerate(self.get_sample_1()):
            dic[names[idx]]["fastq1"] = (
                sample_name_1 if isinstance(sample_name_1, str) else None
            )

        for idx, sample_name_2 in enumerate(self.get_sample_2()):
            dic[names[idx]]["fastq2"] = (
                sample_name_2 if isinstance(sample_name_2, str) else None
            )

        for idx, tech in enumerate(list(self.user_df["tech"])):
            dic[names[idx]]["tech"] = tech if isinstance(tech, str) else None

        return dic

    def get_options(self) -> list[Optional[dict[str, dict[str, Optional[str]]]]]:
        """
        The get_options function returns a list of dictionaries.
        The first dictionary is the paired illumina samples, the second is th
        single illumina samples, and the third is ont_samples. The fourth
        dictionary contains all of the information for each sample in a
        project and will be used to create yaml files for Snakemake.
        Lastly, it :param self: Access the attributes and methods of
        the class in python
        :return: A list of dictionaries
        :doc-author: Trelent
        """
        project_dic = self.get_dic()
        final_dictionary: Optional[dict[str, dict[str, Optional[str]]]] = {}
        single_illumina: Optional[dict[str, dict[str, Optional[str]]]] = {}
        paired_illumina: Optional[dict[str, dict[str, Optional[str]]]] = {}
        ont_samples = {}
        for names, _ in project_dic.items():
            if (
                project_dic[names]["fastq1"]
                and project_dic[names]["fastq2"]
                and project_dic[names]["tech"] == "illumina"
            ):
                paired_illumina[names] = project_dic[names]
                final_dictionary[names] = project_dic[names]
            elif (
                project_dic[names]["fastq1"]
                and not project_dic[names]["fastq2"]
                and project_dic[names]["tech"] == "illumina"
            ):
                single_illumina[names] = project_dic[names]
                final_dictionary[names] = project_dic[names]
            elif (
                project_dic[names]["fastq1"]
                and not project_dic[names]["fastq2"]
                and project_dic[names]["tech"] == "ont"
            ):
                ont_samples[names] = project_dic[names]
                final_dictionary[names] = project_dic[names]
        return [
            paired_illumina,
            single_illumina,
            ont_samples,
            final_dictionary,
        ]

    def get_assembler(self) -> str:
        """
        The get_assembler function returns the assembler used in the pipeline.


        :param self: Access the attributes and methods of the class in python
        :return: The assembler that is being used in the current instance of the class
        :doc-author: Trelent
        """
        return self.consensus_illumina


def get_data_in_align_form(
    assembler: str,
    single_illumina: dict[str, Optional[str]],
    paired_illumina: dict[str, Optional[str]],
) -> list[dict[str, Optional[str]]]:
    """
    The get_data_in_align_form function takes in the assembler, single_illumina
    and paired_illumina dictionaries. It then checks to see if the assembler is
    snippy or iVar. If it is snippy, it returns the paired illumina dictionary
    as well as both single illumina dictionaries.
    If it is iVar, then it returns both paired illumina dictionaries and both
    single illumina dictionaries.

    :param assembler:str: Determine which assembler to use
    :param single_illumina:dict[str: Store the name of the file that contains the single-end
    illumina reads
    :param Optional[str]]: Indicate that the parameter is optional
    :param paired_illumina:dict[str: Store the paired illumina reads
    :param Optional[str]]: Indicate that the value can be none
    :param : Determine which assembler to use
    :return: A list of four dictionaries
    :doc-author: Trelent
    """
    paired_illumina_snippy: dict[str, Optional[str]] = {}
    single_illumina_snippy: dict[str, Optional[str]] = {}
    paired_illumina_ivar: dict[str, Optional[str]] = {}
    single_illumina_ivar: dict[str, Optional[str]] = {}
    if assembler == "snippy":
        paired_illumina_snippy = paired_illumina
        single_illumina_snippy = single_illumina
        paired_illumina_ivar = {}
        single_illumina_ivar = {}
    elif assembler == "iVar":
        paired_illumina_ivar = paired_illumina
        single_illumina_ivar = single_illumina
        paired_illumina_snippy = {}
        single_illumina_snippy = {}
    return [
        paired_illumina_snippy,
        single_illumina_snippy,
        paired_illumina_ivar,
        single_illumina_ivar,
    ]
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from Bio import SeqIO


def detach_reference(input_file, ref):
    """
    The detach_reference function takes a fasta file and removes the first record from it,
    and saves that record to a new file. The function returns the filename of the original fasta
    file without its first sequence.

    :param input_file: Specify the file to be parsed
    :param ref: Specify the name of the file that will be created to store the reference sequence
    :return: The reference sequence
    :doc-author: Trelent
    """
    with open(input_file, "r", encoding="utf8") as handle_input:
        record_list = list(SeqIO.parse(handle_input, "fasta"))
        reference = record_list.pop(0)
    with open(input_file, "w", encoding="utf8") as handle_input_write:
        SeqIO.write(record_list, handle_input_write, "fasta")

    with open(ref, "w", encoding="utf8") as handle_reference:
        SeqIO.write(reference, handle_reference, "fasta")


def attach_reference(input_file, ref):
    """
    The attach_reference function takes a fasta file and attaches the reference sequence to it.
    It is used in the main function to attach the reference sequence to each of the input files.

    :param input_file: Specify the name of the file that contains the sequence records
    :param ref: Specify the reference sequence
    :return: The input file with the reference sequence at the beginning of the list
    :doc-author: Trelent
    """
    with open(ref, "r", encoding="utf8") as handle:
        reference = list(SeqIO.parse(handle, "fasta"))[0]
    with open(input_file, "r", encoding="utf8") as handle:
        records = list(SeqIO.parse(handle, "fasta"))
        records.insert(0, reference)
    with open(input_file, "w", encoding="utf8") as handle:
        SeqIO.write(records, handle, "fasta")
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import sys
from Bio import SeqIO

if __name__ == "__main__":
    reference_fasta = sys.argv[1]
    consensus_file = sys.argv[2]
    output_file = sys.argv[3]
    segment_name = sys.argv[4]

    with open(reference_fasta, "r", encoding="utf-8") as handle_fasta:
        dt_consensus = SeqIO.to_dict(SeqIO.parse(consensus_file, "fasta"))
        for record in SeqIO.parse(handle_fasta, "fasta"):
            if record.id == segment_name:  # make mask
                # get sequences
                vect_out_fasta_to_align = []
                record_id = record.id
                record.id = ""
                record.id = record_id + "_ref"
                # delete descriptions
                vect_out_fasta_to_align.append(record)
                vect_out_fasta_to_align.append(dt_consensus[record_id])
                with open(output_file, "w", encoding="utf-8") as handle_fasta_out_align:
                    SeqIO.write(
                        vect_out_fasta_to_align, handle_fasta_out_align, "fasta"
                    )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import argparse

from Bio import SeqIO
from Bio.Seq import MutableSeq


def compute_masking_sites(
    sequence,
    ranges=None,
    singles_positions=None,
    from_beggining=None,
    from_end=None,
):
    length = len(sequence)
    masking_sites = []
    if ranges is not None:
        for pos in ranges:
            pos[0] = int(pos[0])
            pos[1] = int(pos[1])
            if pos[1] < length and pos[1] > pos[0]:
                masking_sites.extend(iter(range(pos[0] - 1, pos[1] - 1)))
    if single_positions is not None:
        for pos in single_positions:
            pos = int(pos)
            if pos < length:
                masking_sites.append(pos - 1)
    if from_beggining is not None:
        masking_sites.extend(iter(range(int(from_beggining))))
    if from_end is not None:
        masking_sites.extend(iter(range(length - int(from_end), length)))
    return masking_sites


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("consensus", type=str, help="Consensus File Path")
    parser.add_argument("output", type=str, help="Output File Path")
    parser.add_argument(
        "-r",
        "--regions",
        type=str,
        help="Pass a string with the format x-y,n-z,...",
    )
    parser.add_argument(
        "-s",
        "--single",
        type=str,
        help="Pass a string with the format x,y,n,z,...",
    )
    parser.add_argument(
        "-b", "--beggining", type=int, help="Pass a integer with the format x"
    )
    parser.add_argument(
        "-e", "--end", type=int, help="Pass a integer with the format x"
    )

    args = parser.parse_args()

    ranges = None
    consensus = args.consensus or None
    output = args.output or None
    # Need to Error if output is missing
    single_positions = args.single.split(",") if args.single else None
    if args.regions:
        ranges = [x.split("-") for x in args.regions.split(",")]
    from_beggining = args.beggining or None
    from_end = args.end or None
    final_mask_consensus = []

    for record in SeqIO.parse(consensus, "fasta"):
        sequence = MutableSeq(record.seq)
        masking_sites = compute_masking_sites(
            sequence, ranges, single_positions, from_beggining, from_end
        )
        # Taken from insaflu
        ref_pos = 0
        ref_insertions = 0
        # for _ in range(len(sequence)):
        #     if sequence[_] == "-":
        #         ref_insertions += 1
        #         continue
        #     if ref_pos in masking_sites:
        #         sequence[ref_pos + ref_insertions] = "N"
        #     ref_pos += 1
        #     if (ref_pos + ref_insertions) >= len(record.seq):
        #         break
        for position in masking_sites:
            sequence[position] = "N"
        # End of insaflu code
        record.seq = sequence
        final_mask_consensus.append(record)

    with open(output, "w") as handle_fasta_out:
        SeqIO.write(final_mask_consensus, handle_fasta_out, "fasta")
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import re
import sys
import csv
import yaml

from Bio import SeqIO


def get_locus(genbank_file):
    """
    The get_locus function takes a genbank file and returns the locus number of that record.
    If there is only one record in the genbank file, it will return the possible_name.
    If there are multiple records, it will return a list of all locus numbers.

    :param genbank_file: Open the genbank file, and then parse it using seqio
    :param possible_name: Determine if the file is a single genbank file or not
    :return: A list of locus numbers
    :doc-author: Trelent
    """
    locus = []
    with open(genbank_file, encoding="utf-8") as handle_gb:
        for record in SeqIO.parse(handle_gb, "genbank"):
            locus.append(record.name)
    return locus


def get_locus_len(genbank_file):
    """
    The get_locus function takes a genbank file and returns the locus number of that record.
    If there is only one record in the genbank file, it will return the possible_name.
    If there are multiple records, it will return a list of all locus numbers.

    :param genbank_file: Open the genbank file, and then parse it using seqio
    :param possible_name: Determine if the file is a single genbank file or not
    :return: A list of locus numbers
    :doc-author: Trelents
    """
    locus = []
    with open(genbank_file, encoding="utf-8") as handle_gb:
        for record in SeqIO.parse(handle_gb, "genbank"):
            locus.append(len(record.seq))
    return locus


def read_yaml(yaml_file_path: str) -> dict:
    """
    The read_yaml function reads a yaml file and returns the contents as a dictionary.
    The read_yaml function accepts one argument, which is the path to the yaml file.

    :param yaml_file_path: Specify the path to the yaml file that is going to be read
    :return: A dictionary
    :doc-author: Trelent
    """
    with open(yaml_file_path, encoding="utf-8") as yaml_file:
        return yaml.load(yaml_file, Loader=yaml.FullLoader)


def merge_coverage(file_list, output_file):
    """
    The merge_coverage function takes a list of coverage files and merges them into one file.
    The first argument is the list of files, the second argument is the name for the output file.

    :param file_list: Store the list of files that will be merged
    :return: A list of lists with the coverage information for each sample
    :doc-author: Trelent
    """

    with open(file_list[0], "r", encoding="utf8") as file_handler:
        length = file_handler.readlines()[3].split()[1]

    species = get_locus(REFERENCE_GB)
    length = get_locus_len(REFERENCE_GB)
    super_header = ["Name", *species, "", ""]
    super_header_1 = ["Length", *length, "", ""]

    software_parameters = read_yaml(dic_directory["software_parameters"])

    header = [
        "SAMPLES",
        "Mean depth of coverage",
        r"% of size covered by at least 1-fold",
        r"% of size covered by at least X-fold",
    ]
    sub_header = ["", *species, "", *species, "X-Fold", *species]
    final_output = [super_header, super_header_1, header, sub_header]

    for file in file_list:
        with open(file, "r", encoding="utf8") as f:
            sample_name = re.findall("(?<=/)(.*?)(?=_coverage.csv)", file)[0].split(
                "/"
            )[0]
            info = f.readlines()[6].split()
            sample_t = sample_type[sample_name]
            if sample_t in ("snippy", "iVar"):
                sample_cov = software_parameters["mincov"]
            elif sample_t == "medaka":
                sample_cov = software_parameters["mincov_medaka"]
            else:
                raise Exception(
                    "There is no implementation for other tools than snippy, iVar and medaka."
                )
            info[0] = sample_name
            print("This is info prev", info)
            info.insert(1 + len(species), "")
            print("This is info prev", info)
            info.insert(2 + len(species) * 2, sample_cov)
            print("This is info last", info)
            final_output.append(info)

    with open(output_file, mode="w", encoding="utf8") as f:
        f_writer = csv.writer(
            f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL
        )
        f_writer.writerows(final_output)


def get_coverage(filename: str, n_locus: int):
    """
    The get_coverage function takes a filename as an argument and returns the coverage of each
    locus in that file.
    The function also takes n_locus as an argument, which is the number of loci in the file.

    :param filename: Specify the file to be read
    :param n_locus: Specify the number of loci in the file
    :return: A list of the coverage for each locus in a given sample
    :doc-author: Trelent
    """
    with open(filename, newline="", encoding="UTF8") as csvfile:
        coverage_list = list(csv.reader(csvfile, delimiter="\t"))
        sample_name = re.findall("(?<=/)(.*?)(?=_coverage.csv)", filename)[0].split(
            "/"
        )[0]
        # for i in coverage_list:
        #     print(i)
        coverage_list = coverage_list[-1][-n_locus:]
        coverage_list.insert(0, sample_name)
        csvfile.close()
    return coverage_list


def create_script_readable_coverage_file(coverage_files, output, n_locus):
    """
    The create_script_readable_coverage_file function takes a list of coverage files and outputs
    a script readable coverage file. The output is the name of the file you want to create.
    The last argument is n_locus which specifies how many loci are in each coverage file.

    :param coverage_files: Specify the files that contain the coverage data
    :param output: Specify the name of the output file
    :param n_locus: Indicate the number of locus in the genome
    :return: A list of lists, where each sublist contains the coverage each locus
    :doc-author: Trelent
    """
    coverage_list = coverage_files

    n_locus = int(n_locus)
    coverage_translate = []
    for filename in coverage_list:
        coverage_translate.append(get_coverage(filename, n_locus))

    with open(output, mode="w", encoding="UTF8") as output_file:
        f_writer = csv.writer(output_file, delimiter=",")
        f_writer.writerows(coverage_translate)
        output_file.close()


if __name__ == "__main__":
    COVERAGE_FILES = sys.argv[1].split()
    OUTPUT_FILE_1 = sys.argv[2]
    OUTPUT_FILE_2 = sys.argv[3]
    REFERENCE_GB = sys.argv[4]

    dic_directory = read_yaml("../config/constants.yaml")
    sample_type = read_yaml(dic_directory["config_file"])["sample_type"]

    NUMBER_OF_LOCUS = len(get_locus(REFERENCE_GB))
    merge_coverage(COVERAGE_FILES, OUTPUT_FILE_1)
    create_script_readable_coverage_file(
        COVERAGE_FILES, OUTPUT_FILE_2, NUMBER_OF_LOCUS)
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import sys
import os
from yaml_io import read_yaml

if __name__ == "__main__":
    dic_directory: dict[str, str] = read_yaml("../config/constants.yaml")

    SAMPLE = sys.argv[1]

    config_user = read_yaml(f"{dic_directory['config_file']}")

    fastq1 = config_user["samples"][SAMPLE]["fastq1"]
    fastq2 = config_user["samples"][SAMPLE]["fastq2"]

    if fastq1 and fastq2:
        run = f"mv ./samples/{SAMPLE}/raw_fastqc/{fastq1}_fastqc.html ./samples/{SAMPLE}/raw_fastqc/{SAMPLE}_1_fastqc.html"
        os.system(f"{run}")
        run = f"mv ./samples/{SAMPLE}/raw_fastqc/{fastq2}_fastqc.html ./samples/{SAMPLE}/raw_fastqc/{SAMPLE}_2_fastqc.html"
        os.system(f"{run}")
        run = f"mv ./samples/{SAMPLE}/raw_fastqc/{fastq1}_fastqc.zip ./samples/{SAMPLE}/raw_fastqc/{SAMPLE}_1_fastqc.zip"
        os.system(f"{run}")
        run = f"mv ./samples/{SAMPLE}/raw_fastqc/{fastq2}_fastqc.zip ./samples/{SAMPLE}/raw_fastqc/{SAMPLE}_2_fastqc.zip"
        os.system(f"{run}")
    elif fastq1:
        run = f"mv ./samples/{SAMPLE}/raw_fastqc/{fastq1}_fastqc.html ./samples/{SAMPLE}/raw_fastqc/{SAMPLE}_fastqc.html"
        os.system(f"{run}")
        run = f"mv ./samples/{SAMPLE}/raw_fastqc/{fastq1}_fastqc.zip ./samples/{SAMPLE}/raw_fastqc/{SAMPLE}_fastqc.zip"
        os.system(f"{run}")
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
import os
import re
import gzip
import codecs
import csv
import argparse
import datetime as dt

from Bio import SeqIO


def import_seqs(fasta_file):
    """Imports sequences from a FASTA file.

    Parameters
    ----------
    fasta_file : str
            Path to the FASTA file.

    Returns
    -------
    seqs : list of list
    List with one sublist per sequence in the
    FASTA file. Each sublist has two elements:
    The identifier/header of the sequence  (str)
    and the sequence (str).
    """

    seqs = []
    for record in SeqIO.parse(fasta_file, "fasta"):
        seqid = record.id
        sequence = str(record.seq)
        seqs.append([seqid, sequence])

    return seqs


def read_depth_file(depth_file, sample_map):
    """read depth file"""
    with gzip.open(depth_file, "rb") if depth_file.endswith(".gz") else open(
        depth_file, "r"
    ) as df:
        lines = (
            csv.reader(codecs.iterdecode(df, "utf-8"), delimiter="\t")
            if depth_file.endswith(".gz")
            else csv.reader(df, delimiter="\t")
        )
        for l in lines:
            if len(l) < 3 or len(l[0].strip()) == 0:
                continue
            if l[0] in sample_map:
                sample_map[l[0]].append([l[0], l[1], l[2]])
            else:
                sample_map[l[0]] = [[l[0], l[1], l[2]]]
    return sample_map


def import_depth(depth_data, sample_ids=[]):
    """Import depth files,

    Parameters
    ----------
    depth_data : str
            Path to the depth files or file itself, can be in gz format.

    Returns
    -------
    sample_map : dictionary with name of sequence and list of depth
    """
    sample_map = {}
    # list depth files and match sample to depth file
    if os.path.isdir(depth_data) is True:
        depth_files = os.listdir(depth_data)
        sample_map = {}
        for i in sample_ids:
            match = [f for f in depth_files if i in f]
            if len(match) > 0:
                file = os.path.join(depth_data, match[0])
                sample_map = read_depth_file(file, sample_map)
            else:
                print("Could not find a depth file for " "sample {0}".format(i))
    elif os.path.isfile(depth_data) is True:
        sample_map = read_depth_file(depth_data, sample_map)
    return sample_map


def process_data(reference_sequence, samples, sample_map, cutoff, mask_gaps):
    """Mask sample sequences,

    Parameters
    ----------
    reference_sequence : str
            Path to the depth files or file itself, can be in gz format.
    samples : list
            list of all elements, [[<name>, <sequence>], ...]
    sample_map : dictionary
            dictionary with name of sequence and list of depth
    Returns
    -------
            None
    """

    # determine gaps in reference
    # match '-' in reference
    ref_gaps = list(re.finditer("(-)+", reference_sequence))
    ref_gaps = [s.span() for s in ref_gaps]
    gaps = {s[0]: s[1] - s[0] for s in ref_gaps}

    for i, sample in enumerate(samples):
        seqid = sample[0]
        sequence = sample[1]

        # determine initial and final subsequences with '-'
        left_trimmed = sequence.lstrip("-")
        left_pos = len(sequence) - len(left_trimmed)
        right_trimmed = sequence.rstrip("-")
        right_pos = len(sequence) - (len(sequence) - len(right_trimmed))
        assert seqid in sample_map, "Error, SeqId-{} not in depth file".format(seqid)
        depth_info = sample_map[seqid]

        # get positions with zero coverage
        zero_cov = [d for d in depth_info if int(d[2]) <= cutoff]

        # shift low depth positions based on gaps on reference
        for p in gaps:
            current_p = p
            shift = gaps[p]
            zero_cov = [
                [z[0], str(int(z[1]) + shift), z[2]]
                if int(z[1]) - 1 >= current_p
                else [z[0], z[1], z[2]]
                for z in zero_cov
            ]

        # after adjusting for gaps on reference we can remove
        # positions on trimmed regions
        zero_cov = [
            z for z in zero_cov if int(z[1]) > left_pos and int(z[1]) <= right_pos
        ]

        total = len(zero_cov)
        print(
            "\nFound a total of {0} positions with "
            "low depth for sample {1}.\n".format(total, seqid)
        )

        print("Masking low depth positions in {0}...".format(seqid))

        splitted = list(sequence)

        masked = 0
        zero_pos = []
        for pos in zero_cov:
            # Python indexing starts at 0
            low_cov_pos = int(pos[1]) - 1
            current = splitted[low_cov_pos]
            if current != "-" or mask_gaps is True:
                splitted[low_cov_pos] = "N"
                # 				print('{0}: {1} --> N'.format(pos[1], current))
                zero_pos.append(pos[1])
                masked += 1

        print("Masked {0}/{1}".format(masked, total))

        samples[i][1] = "".join(splitted)


# 		# save info about positions below coverage
# 		with open(pos_file, 'a') as pf:
# 			pos_info = '{0}\t{1}\t{2}\n'.format(seqid, total,
# 												','.join(zero_pos))
# 			pf.write(pos_info)


def main(input_fasta, depth_data, output_fasta, cutoff, mask_gaps):
    start_date = dt.datetime.now()
    start_date_str = dt.datetime.strftime(start_date, "%Y-%m-%dT%H:%M:%S")
    print("Started at: {0}\n".format(start_date_str))

    # import sequences in MSA FASTA file
    msa_seqs = import_seqs(input_fasta)

    # keep reference and samples separate
    reference = msa_seqs[0]
    reference_id = reference[0]
    samples = msa_seqs[1:]
    sample_ids = [s[0] for s in samples]
    print("Reference:\n{0}".format(reference_id))
    print(
        "\nAligned samples:\n{0}\nTotal: {1}".format(
            "\n".join(sample_ids), len(sample_ids)
        )
    )

    # read depth information
    sample_map = import_depth(depth_data, sample_ids)

    # 	out_dir = os.path.dirname(output_fasta)
    # 	pos_basename = os.path.basename(output_fasta).split('.fasta')[0]
    # 	pos_file = os.path.join(out_dir, pos_basename+'_pos')

    # process data
    process_data(reference[1], samples, sample_map, cutoff, mask_gaps)

    # write masked sequences to FASTA
    with open(output_fasta, "w") as out:
        reference_record = [">{0}\n{1}".format(reference[0], reference[1])]
        samples_records = [">{0}\n{1}".format(s[0], s[1]) for s in samples]
        records = reference_record + samples_records
        out_text = "\n".join(records)
        out.write(out_text)

    end_date = dt.datetime.now()
    end_date_str = dt.datetime.strftime(end_date, "%Y-%m-%dT%H:%M:%S")

    delta = end_date - start_date
    minutes, seconds = divmod(delta.total_seconds(), 60)

    print("\nFinished at: {0}".format(end_date_str))
    print("Elapsed time: {0:.0f}m{1:.0f}s".format(minutes, seconds))


def parse_arguments():
    def msg(name=None):
        # simple command with default options
        simple_cmd = (
            "   python msa_low_cov_masker.py -i input.fasta "
            "-df depth_files -o out.fasta"
        )

        # different depth of coverage value
        depth_command = (
            "  python msa_low_cov_masker.py -i input.fasta "
            "-df depth_files -o out.fasta --c 10"
        )

        # mask gaps contained in sequences
        gaps_command = (
            "  python msa_low_cov_masker.py -i input.fasta "
            "-df depth_files -o out.fasta --c 10 --g"
        )

        usage_msg = (
            "\nSimple command with default options:\n\n{0}\n"
            "\nDifferent depth of coverage value:\n\n{1}\n"
            "\nMask gaps in the middle of aligned sequences:"
            "\n\n{2}\n".format(simple_cmd, depth_command, gaps_command)
        )

        return usage_msg

    parser = argparse.ArgumentParser(
        description=__doc__,
        formatter_class=argparse.RawDescriptionHelpFormatter,
        usage=msg(),
    )

    parser.add_argument(
        "-i",
        type=str,
        required=True,
        dest="input_fasta",
        help="Path to the input FASTA file that contains "
        "the Multiple Sequence Alignment.",
    )

    parser.add_argument(
        "-df",
        type=str,
        required=True,
        dest="depth_files",
        help="Path to the directory with TSV "
        "files that contain depth of "
        "sequencing data. One file per "
        "sample (files must contain the "
        "identifier of the sample in the name).",
    )

    parser.add_argument(
        "-o",
        type=str,
        required=True,
        dest="output_fasta",
        help="Path to the output FASTA file "
        "to which the masked sequences "
        "will be saved.",
    )

    parser.add_argument(
        "--c",
        type=int,
        required=False,
        default=0,
        dest="cutoff",
        help="Positions with a depth value equal "
        "or below the value of this argument "
        "will be substituted by N (default=0).",
    )

    parser.add_argument(
        "--g",
        required=False,
        action="store_true",
        dest="mask_gaps",
        help="If the process should mask gaps (-) " "with low depth (default=False).",
    )

    args = parser.parse_args()

    return [
        args.input_fasta,
        args.depth_files,
        args.output_fasta,
        args.cutoff,
        args.mask_gaps,
    ]


if __name__ == "__main__":
    args = parse_arguments()
    main(args[0], args[1], args[2], args[3], args[4])
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
import re
import csv
import sys


def get_info_dic(info_list):
    """
    The get_info_dic function takes a list of strings and returns a dictionary with the first item in each string as the key and the second as its value.
    For example, if info_list = ['ID=gene:Solyc00g005000.2.3', 'Name=Potri.001G000100'] then get_info_dic(info_list) will return {'ID':'gene:Solyc00g005000.2.3', 'Name':'Potri001G000100'}.


    :param info_list: Create a dictionary of the information contained in the info column
    :return: A dictionary of the information in each line
    :doc-author: Trelent
    """
    info_dic = {}
    for item in info_list:
        pieces = item.split("=")
        info_dic[pieces[0]] = pieces[1]
    return info_dic


def get_row_dict(row):
    """
    The get_row_dict function takes a row from the vcf file and returns a dictionary with the following keys:
        CHROM, POS, ID, REF, ALT.


    :param row: Pass the row of data from the vcf file to the function
    :return: A dictionary with the chrom, pos, id, ref and alt values from a row in the vcf file
    :doc-author: Trelent
    """
    row_dic = {
        "CHROM": row[0],
        "POS": row[1],
        "ID": row[2],
        "REF": row[3],
        "ALT": row[4],
        "QUAL": row[5],
    }
    return row_dic


def create_graph_csv(files_path, output_file):
    """
    The create_graph_csv function creates a csv file with the following information:
        - Sample name
        - Number of variants with frequency less than 50% (less_50)
        - Number of variants between 90 and 50% (between_90_50)

    :param files_path: Pass the path of the files that will be used to create a csv file
    :param output_file: Specify the name of the output file
    :return: A list of lists with the number of variants that are less than 50% frequency and between 90% and 50%
    :doc-author: Trelent
    """
    file_final = [["Sample", "Less 50", "Between 90 and 50"]]
    for file in files_path:
        count_less = 0
        count_more_50_less_90 = 0
        with open(file, "r") as invcf:
            for line in invcf:
                try:
                    if line.startswith("#"):
                        continue
                    new_entry = []
                    line = line.strip().split()
                    info = line[7].split(";")
                    info_dic = get_info_dic(info)
                    row_dic = get_row_dict(line)
                    if "ANN" not in info_dic:
                        info_dic["ANN"] = "|||||||||||||||||||||||||||||||||||||||"
                        info_dic["FTYPE"] = ""
                    else:
                        info_dic["FTYPE"] = "CDS"

                    if (
                        "snp" in info_dic["TYPE"]
                        or "del" in info_dic["TYPE"]
                        or "ins" in info_dic["TYPE"]
                    ):
                        freq = round(float(info_dic["AO"]) / float(info_dic["DP"]), 2)
                        if freq < 0.50:
                            count_less += 1
                        elif freq > 0.50 and freq < 0.90:
                            count_more_50_less_90 += 1
                except:
                    continue
        file_final.append(
            [
                re.findall("(?<=/)(.*?)(?=_snpeff.vcf)", file)[0],
                count_less,
                count_more_50_less_90,
            ]
        )  # type: ignore

    with open(output_file, mode="w") as f:
        f_writer = csv.writer(f, delimiter=",")
        f_writer.writerows(file_final)
        f.close()


if __name__ == "__main__":
    files_path = sys.argv[1].split(" ")
    output_file = sys.argv[2]
    create_graph_csv(files_path, output_file)
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
from random import choices


def random_string(length):
    """
    The random_string function returns a random string of length n.

    :param length: Determine the length of the random string
    :return: A string of random characters
    :doc-author: Trelent
    """
    abc = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
    return "".join(choices(abc, k=length))
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import sys
import csv
from Bio import SeqIO
from extract_gb_info import get_locus


def parse_depth(depth_file):
    with open(depth_file, "r", encoding="UTF8") as depth:
        reader = csv.reader(depth)
        out = []
        for i in reader:
            out.append(i[0].split("\t"))
    return out


def get_consensus(consensus):
    with open(consensus, "r", encoding="UTF8") as handle_fasta:
        return list(SeqIO.parse(handle_fasta, "fasta"))[0].seq


def split_consensus(consensus, depth, locus, output):
    depth = parse_depth(depth)
    locus = get_locus(locus)
    dic = {}
    for i in locus:
        dic[i] = ""
    for entry in depth:
        dic[entry[0]] = entry[1]
    numbers_list = []
    for i in locus:
        numbers_list.append(int(dic[i]))
    consensus_seq = get_consensus(consensus)
    # print(len(consensus_seq))
    # consensus_seq = "".join(['A' for x in range(0,13136)])
    # print(consensus_seq)

    last_list = []
    # count_nt = 0
    seq = " "

    new_dic = {}
    count = 0
    for loc in locus:
        new_dic[loc] = [count, count + int(dic[loc])]
        # print(new_dic[loc])
        count += int(dic[loc])

    for loc in new_dic:
        small_part = consensus_seq[new_dic[loc][0] : new_dic[loc][1]]
        seq = small_part
        last_list.append(seq)

    with open(output, "w", encoding="UTF8") as out:
        for idx, seq in enumerate(last_list):
            out.writelines(f">{locus[idx]}\n")
            out.writelines(seq)
            out.writelines("\n")


if __name__ == "__main__":
    CONSENSUS = sys.argv[1]
    DEPTH = sys.argv[2]
    LOCUS = sys.argv[3]
    OUTPUT = sys.argv[4]
    split_consensus(consensus=CONSENSUS, depth=DEPTH, locus=LOCUS, output=OUTPUT)
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import sys
from extract_gb_info import get_locus


def split_depth_file(file_path, reference_gb):
    """
    The split_depth_file function takes a file path as an argument and splits the depth file into separate files for each reference sequence.
    The function takes the last position of a string containing the full path to the depth file as its starting point, then finds where in that string
    the first instance of "/" occurs. It then uses this index to slice out everything after it and stores it in a variable called filename. The function
    then creates another variable called new_file; which is empty at first but will eventually contain all lines from our original depth file except for
    those corresponding to our reference sequence (which we know because they start with the number)

    :param file_path: Specify the path to the file that is being split
    :return: A list of lines from the depth file that correspond to the reference sequence
    :doc-author: Trelent
    """
    last_pos = len(file_path) - file_path[::-1].index("/")
    path = file_path[:last_pos]
    reference_list = get_locus(reference_gb)
    for name in reference_list:
        with open(file_path, "r", encoding="utf-8") as handle_depth:
            new_file = []
            for line in handle_depth.readlines():
                if line.split()[0] == str(name):
                    new_file.append(line)
            if new_file:
                with open(f"{path}{name}.depth", "w", encoding="utf8") as output_file:
                    output_file.writelines(new_file)
            # else:
            # print("Action already taken.")


if __name__ == "__main__":
    DEPTH_FILE = sys.argv[1]
    REFERENCE_GB = sys.argv[2]
    split_depth_file(DEPTH_FILE, REFERENCE_GB)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import sys
from Bio import SeqIO
from extract_gb_info import get_locus


def get_locus_out_id(string_id: str):
    if string_id.find("__") == -1:
        return False
    return string_id[len(string_id) - string_id[::-1].index("__"):]


def prepare_msa_masker(alignment, outdir, reference_gb):
    """
    The prepare_msa_masker function takes a multiple sequence alignment and a reference genbank file as input.
    It then creates an output directory for each locus in the reference genome, and writes the sequences of that locus to
    a fasta file within each output directory.

    :param alignment: Specify the path to the alignment file
    :param outdir: Specify the directory where the files will be saved
    :param reference_gb: Get the locus of each segment in the alignment
    :return: A list of lists
    :doc-author: Trelent
    """
    locus_names = get_locus(reference_gb)

    locus_records = {x: [] for x in locus_names}

    for locus in locus_records:
        for record in SeqIO.parse(alignment, "fasta"):
            if locus == record.id or locus == get_locus_out_id(record.id):
                locus_records[locus].append(record)
    for locus, list_of_records in locus_records.items():
        SeqIO.write(
            list_of_records,
            f"{outdir}/{locus}/Alignment_nt_{locus}.fasta",
            "fasta",
        )


if __name__ == "__main__":
    alignment = sys.argv[1]
    outdir = sys.argv[2]
    reference_gb = sys.argv[3]
    prepare_msa_masker(alignment, outdir, reference_gb)
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import sys
import csv
from Bio import SeqIO
import extract_gb_info as ggb
from yaml_io import read_yaml
import textwrap


def write_fasta(dictionary: dict[str, str], filename: str):
    with open(filename, "w") as fasta:
        for key, value in dictionary.items():
            fasta.write(f">{key}\n")
            fasta.write("\n".join(textwrap.wrap(str(value), 70)))
            fasta.write("\n")


def get_reference(file_name: str) -> SeqIO.SeqRecord:
    with open(file_name, "r") as handle:
        return next(SeqIO.parse(handle, "fasta"))


def get_ref_adjusted_positions(
    alignment: str, positions: list[tuple[str, list[list[int]]]], gene
) -> list[tuple[str, list[list[int]]]]:
    new_positions = []
    ref = get_reference(alignment)

    index_list = [idx for idx, nucleotide in enumerate(
        ref) if nucleotide != "-"]
    for gene_group in positions:
        if gene_group[0] == gene:
            for group in gene_group[1]:
                group[0] = index_list[group[0]]
                if group[1] >= len(index_list):
                    group[1] = index_list[-1]
                else:
                    group[1] = index_list[group[1]]
            new_positions.append(gene_group)
    return new_positions


def get_coverage_to_translate_matrix(filename: str) -> dict[str, list[str]]:
    with open(filename) as csvfile:
        csv_reader = csv.reader(csvfile)
        coverage_dic: dict[str, list[str]] = {
            row[0]: row[1:] for row in csv_reader}
    return coverage_dic


def get_position_in_list(reference: str, locus: str) -> int:
    list_of_locus: list[str] = ggb.get_locus(reference)
    return list_of_locus.index(locus)


def get_best_translation(record, pos: list[int]) -> str:
    options = ["", "", ""]
    options[0] = record.seq[pos[0]: pos[1]].ungap(
    ).translate(table=11, to_stop=False)
    options[1] = record.seq[pos[0] + 1: pos[1]].ungap(
    ).translate(table=11, to_stop=False)
    options[2] = record.seq[pos[0] + 2: pos[1]].ungap(
    ).translate(table=11, to_stop=False)

    idx = 0
    min_idx = 0
    min_value = options[0][:-1].count("*")
    for entry in options[1:]:
        idx += 1
        new_value = entry[:-1].count("*")
        if new_value < min_value:
            min_value = new_value
            min_idx = idx
    return options[min_idx]


def get_new_consensus(positions:  list[tuple[str, list[list[int]]]],
                      coverage_dic: dict[str, list[str]],
                      coverage_value: float,
                      gene_idx: int) -> dict[str, dict[str, str]]:
    new_consensus: dict[str, dict[str, str]] = {}
    for gene_structure in positions:
        gene_name = gene_structure[0]
        gene_positions = gene_structure[1]
        new_consensus[gene_name] = {}
        for pos in gene_positions:
            for record in SeqIO.parse(alignment, "fasta"):
                if record.id == reference_id:
                    if new_consensus[gene_name].get(record.id, False):
                        new_consensus[gene_name][record.id] += get_best_translation(
                            record, pos)
                    else:
                        new_consensus[gene_name][record.id] = get_best_translation(
                            record, pos)
                    continue

                seq = "".join([str(record.seq)[pos[0]: pos[1]].replace("-", "")
                              for pos in gene_positions])
                gene_length = sum([pos[1] - pos[0] for pos in gene_positions])
                if seq.count("N") / gene_length > 0.1:
                    continue
                identifier = record.id
                record_coverage = float(
                    coverage_dic[identifier[: identifier.index(
                        f"__{reference_id}")]][gene_idx]
                )
                if record_coverage >= coverage_value:
                    if new_consensus[gene_name].get(identifier, False):
                        new_consensus[gene_name][record.id] += get_best_translation(
                            record, pos)
                    else:
                        new_consensus[gene_name][identifier] = get_best_translation(
                            record, pos)
    return new_consensus


def write_fast_aa(
    reference: str,
    alignment: str,
    output: str,
    reference_id: str,
    gene: str,
    coverage: str,
):
    coverage_dic = get_coverage_to_translate_matrix(coverage)

    constants: dict[str, str] = read_yaml("../config/constants.yaml")

    coverage_value: int = read_yaml(constants["software_parameters"])[
        "min_coverage_consensus"
    ]

    gene_idx = get_position_in_list(reference, reference_id)

    positions: list[tuple[str, list[list[int]]]
                    ] = ggb.get_positions_gb(reference)
    positions = get_ref_adjusted_positions(alignment, positions, gene)

    new_consensus = get_new_consensus(positions,
                                      coverage_dic,
                                      coverage_value,
                                      gene_idx)

    for value in new_consensus.values():
        write_fasta(value, output)


if __name__ == "__main__":
    reference = sys.argv[1]
    alignment = sys.argv[2]
    output = sys.argv[3]
    reference_id = sys.argv[4]
    gene = sys.argv[5]
    coverage = sys.argv[6]

    write_fast_aa(reference, alignment, output, reference_id, gene, coverage)
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
import csv
import re

# from typing_extensions import TypedDict
import sys

DESCRIPTORS = [
    "CHROM",
    "POS",
    "TYPE",
    "REF",
    "ALT",
    "FREQ",
    "COVERAGE",
    "EVIDENCE",
    "FTYPE",
    "STRAND",
    "NT_POS",
    "AA_POS",
    "EFFECT",
    "NT CHANGE",
    "AA CHANGE",
    "AA CHANGE ALT",
    "LOCUS_TAG",
    "GENE",
    "PRODUCT",
    "VARIANTS IN INCOMPLETE LOCUS",
]

#
# class SnakemakeEntry(TypedDict):
#     chrom: str
#     pos: int
#     ref: str
#     alt: str
#     qual: float
#     info: dict[str, str]
#     ann: dict[str, str]
#     names: list[str]
#     values: list[str]


def remove_smallcase(string):
    return "".join(letter for letter in string if not letter.islower())


def smaller(string: str) -> str:
    return string[:2] + remove_smallcase(string[2:])


def convert_list_to_dict(data: list[str]) -> tuple[dict[str, str], dict[str, str]]:
    snp_annotation = [
        "Allele",
        "Annotation",
        "Annotation_Impact",
        "Gene_Name",
        "Gene_ID",
        "Feature_Type",
        "Feature_ID",
        "Transcript_BioType",
        "Rank",
        "HGVS.c",
        "HGVS.p",
        "cDNA.pos/cDNA.length",
        "CDS_CDSLength",
        "AA.pos/AA.length",
        "Distance",
        "ERRORS/WARNINGS/INFO",
    ]
    new_dict = {}

    ann_dict = {}
    for item in data:
        if item:
            k, v = item.split("=")
            if k.upper() == "ANN":
                new_v = v.split("|")
                ann_dict = {snp_annotation[idx]: new_v[idx]
                            for idx in range(16)}
            else:
                new_dict[k] = v
    return new_dict, ann_dict


# class DataEntry(TypedDict, total=False):
#     "CHROM"
#     "POS"
#     "TYPE"
#     "REF"
#     "ALT"
#     "FREQ"
#     "COVERAGE"
#     "EVIDENCE"
#     "FTYPE"
#     "STRAND"
#     "NT_POS"
#     "AA_POS"
#     "EFFECT"
#     "NT CHANGE"
#     "AA CHANGE"
#     "AA CHANGE ALT"
#     "LOCUS_TAG"
#     "GENE"
#     "PRODUCT"
#     "VARIANTS IN INCOMPLETE LOCUS"
#
def get_multiple_freq(AO, DP):
    dp_value = float(DP)
    freqs = [float(v) / dp_value for v in AO.split(",")]
    return round(min(freqs), 2)


def get_type(a, b):
    if a == b:
        return "snp"
    elif a > b:
        return "del"
    elif a < b:
        return "ins"
    else:
        return "complex"


def get_info_on(dictionary, key, before=""):
    return before + dictionary.get(key) if dictionary.get(key, False) else ""


def analyse_file(input_file):
    with open(input_file) as handler:
        lines = [line for line in handler.readlines() if line[0] != "#"]
    vcf_data = []
    for line in lines:
        line = line.split()
        info, ann = convert_list_to_dict(line[7].split(";"))
        entry = {
            "chrom": line[0],
            "pos": int(line[1]),
            "ref": line[3],
            "alt": line[4],
            "qual": float(line[5]),
            "info": info,
            "ann": ann,
            "names": line[8].split(":"),
            "values": line[9].split(":"),
        }

        get_freq = entry["info"].get("FREQ", False)
        if not get_freq:
            try:
                get_freq = get_multiple_freq(
                    entry["info"]["AO"], entry["info"]["DP"])
            except KeyError:
                get_freq = round(float(entry["values"][-1]), 2)
        else:
            get_freq = round(float(get_freq) / 100, 2)
        default = [
            entry["chrom"],
            entry["pos"],
            entry["info"].get("TYPE", get_type(
                len(entry["ref"]), len(entry["alt"]))),
            entry["ref"],
            entry["alt"],
            get_freq,
            get_info_on(entry["info"], "DP"),
            get_info_on(entry["info"], "AO", f"{entry['alt']}:"),
            get_info_on(entry["info"], "RO", f"{entry['ref']}:"),
        ]

        if entry.get("ann"):
            ann = entry["ann"]

            default.extend(
                [
                    "CDS",
                    "+",
                    ann["CDS_CDSLength"],
                    ann["AA.pos/AA.length"],
                    ann["Annotation"],
                    ann["HGVS.c"],
                    ann["HGVS.p"],
                    smaller(ann["HGVS.p"]),
                    ann["Gene_Name"],
                    ann["Gene_Name"],
                    "yes",
                ]
            )
        else:
            default.extend(["", "", "", "", "", "", "", "", "", "", ""])
        vcf_data.append(default)
    # print(vcf_data)
    return vcf_data


def word_in_list(word, check_list):
    return any(item in word for item in check_list)


def comparison(value1, value2, signal):
    if signal == "bigger":
        return value1 >= value2
    elif signal == "smaller":
        return value1 < value2


def filter_variants(
    data: list[list[str]], signal: str, list_of_types: list[str], identifier: str
) -> list[list[str]]:
    filtered_data = []
    for entry in data:
        entry_d = {DESCRIPTORS[idx]: value for idx, value in enumerate(entry)}
        if not comparison(entry_d["FREQ"], 0.51, signal):
            continue
        if not word_in_list(entry_d["TYPE"], list_of_types):
            continue
        entry.insert(0, identifier)
        filtered_data.append(entry)

    return filtered_data


def validated_variants(files_path, output_file, signal, re_expression, list_of_types):
    """
    The validated_variants function takes a list of vcf files and outputs a csv file with the following columns:
        ID, CHROM, POS, TYPE, REF, ALT, FREQ (allele frequency), COVERAGE (total depth at that position), EVIDENCE (number of reads supporting variant)
        FTYPE(functional type), STRAND(strand bias for reads supporting variant), NT_POS(nucleotide position in codon)
        AA_POS(amino acid position in protein sequence) EFFECT(effect on gene function as reported by snpeff software package).


    :param files_path: Pass the path to the folder containing all of your vcf files
    :param output_file: Specify the name of the file that will be created
    :return: A csv file with the following columns:
    :doc-author: Trelent
    """
    vcf_final = [
        [
            "ID",
            "CHROM",
            "POS",
            "TYPE",
            "REF",
            "ALT",
            "FREQ",
            "COVERAGE",
            "EVIDENCE",
            "FTYPE",
            "STRAND",
            "NT_POS",
            "AA_POS",
            "EFFECT",
            "NT CHANGE",
            "AA CHANGE",
            "AA CHANGE ALT",
            "LOCUS_TAG",
            "GENE",
            "PRODUCT",
            "VARIANTS IN INCOMPLETE LOCUS",
        ]
    ]
    for file in files_path:
        identifier = re.findall(re_expression, file)[0]
        clean_file = analyse_file(file)
        filtered_file = filter_variants(
            clean_file, signal, list_of_types, identifier)
        vcf_final.extend(filtered_file)
    with open(output_file, mode="w") as f:
        f_writer = csv.writer(f, delimiter=",")
        f_writer.writerows(vcf_final)
        f.close()


if __name__ == "__main__":
    files_path = sys.argv[1].split(" ")
    output_file = sys.argv[2]
    type_of_file = sys.argv[3]
    if type_of_file == "validated_variants":
        signal = "bigger"
        re_expression = "(?<=sample__)(.*?)(?=_snpeff.vcf)"
        list_of_words = ["snp", "mnp", "complex", "ins", "del"]
    elif type_of_file == "minor_iSNVs_inc_indels":
        signal = "smaller"
        re_expression = "(?<=/freebayes/)(.*?)(?=_snpeff.vcf)"
        list_of_words = ["snp", "del", "ins"]
    elif type_of_file == "minor_iSNVs":
        signal = "smaller"
        re_expression = "(?<=/freebayes/)(.*?)(?=_snpeff.vcf)"
        list_of_words = ["snp", "complex"]
    else:
        raise RuntimeError("That was a bad call")  # to change
    validated_variants(
        files_path,
        output_file,
        signal,
        re_expression,
        list_of_words,
    )
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
import yaml


def read_yaml(yaml_file_path: str) -> dict:
    with open(yaml_file_path, encoding="utf-8") as yaml_file:
        return yaml.load(yaml_file, Loader=yaml.FullLoader)


def write_yaml(yaml_file_path: str, dump_dict: dict) -> None:
    with open(yaml_file_path, "w", encoding="utf-8") as file:
        yaml.dump(dump_dict, file)
ShowHide 92 more snippets with no or duplicated tags.

Login to post a comment if you would like to share your experience with this workflow.

Do you know this workflow well? If so, you can request seller status , and start supporting this workflow.

Free

Created: 1yr ago
Updated: 1yr ago
Maitainers: public
URL: https://github.com/RdrigoE/insaflu_snakemake
Name: insaflu_snakemake
Version: v1.0.0
Badge:
workflow icon

Insert copied code into your website to add a link to this workflow.

Downloaded: 0
Copyright: Public Domain
License: None
  • Future updates

Related Workflows

cellranger-snakemake-gke
snakemake workflow to run cellranger on a given bucket using gke.
A Snakemake workflow for running cellranger on a given bucket using Google Kubernetes Engine. The usage of this workflow ...