Variant Filtering Module for VCF Files with Region and Annotation Filters

public public 1yr ago Version: v0.3.0 0 bookmarks

Collection of variant filters

:speech_balloon: Introduction

The module consists of rules used to filter VCF files. Currently the VCFs can be filtered based on regions and annotation.

Region based filtering

Regions filtering is based on bcftools and inculdes or excludes variants based on a bed file.

Annotation based filtering

Filter criteria are defined in a .yaml file and can be hard filters or soft filters where a filter flag is added to the filter column in the VCF file. Example of annotations that can be used are those found in the format column or added by VEP. Filter criteria can be combined by logical operators. For matching annotations there is the possibility to use regular expressions in combination with for example exist. Also, NA-behavior can be specified.

:heavy_exclamation_mark: Dependencies

In order to use this module, the following dependencies are required:

:school_satchel: Preparations

Sample and unit data

Input data should be added to samples.tsv and units.tsv . The following information need to be added to these files:

Column Id Description
samples.tsv
sample unique sample/patient id, one per row
tumor_content ratio of tumor cells to total cells
units.tsv
sample same sample/patient id as in samples.tsv
type data type identifier (one letter), can be one of T umor, N ormal, R NA
platform type of sequencing platform, e.g. NovaSeq
machine specific machine id, e.g. NovaSeq instruments have @Axxxxx
flowcell identifer of flowcell used
lane flowcell lane number
barcode sequence library barcode/index, connect forward and reverse indices by + , e.g. ATGC+ATGC
fastq1/2 absolute path to forward and reverse reads
adapter adapter sequences to be trimmed, separated by comma

:white_check_mark: Testing

The workflow repository contains a small test dataset .tests/integration which can be run like so:

cd .tests/integration
$ snakemake -s ../../Snakefile -j1 --configfile config.yaml --use-singularity

:rocket: Usage

To use this module in your workflow, follow the description in the snakemake docs . Add the module to your Snakefile like so:

module biomarker:
 snakefile:
 github(
 "hydra-genetics/filter",
 path="workflow/Snakefile",
 tag="v0.1.0",
 )
 config:
 config
use rule * from biomarker as biomarker_*

Compatibility

Latest:

  • annotation:v0.1.0

  • snv_indel:v0.2.0

See COMPATIBLITY.md file for a complete list of module compatibility.

Input files

File Description
hydra-genetics/annotation
annotation/{file}.vcf.gz annotated vcf
hydra-genetics/snv_indel data
cnv_sv/{file}.vcf.gz non-annotated vcf

Output files

The following output files should be targeted via another rule (see Config for more info):

File Description
{file}.include.{tag}.vcf.gz vcf filtered by bcftools include
{file}.exclude.{tag}.vcf.gz vcf filtered by bcftools exclude
{file}.filter.{tag}.vcf.gz vcf filtered based on annotations

Config

The {tag} in the output file must be defined in the config file for the specifc rule that should be applied, see example in config.yaml where:

  • row 14 defines tag: 'noexon1' for rule bcftools_filter_include_region

  • row 20 defines tag: 'snv_hard_filter' for rule filter_vcf

:judge: Rule Graph

Filtering

rule_graph

Code Snippets

36
37
38
39
40
41
shell:
    "(bcftools filter "
    "{params.filter} "
    "{params.extra} "
    "{input.vcf} "
    "-o {output.vcf}) &> {log}"
73
74
75
76
77
78
shell:
    "(bcftools filter "
    "{params.filter} "
    "{params.extra} "
    "{input.vcf} "
    "-o {output.vcf}) &> {log}"
108
109
wrapper:
    "v1.24.0/bio/bcftools/view"
31
32
script:
    "../scripts/filter_vcf.py"
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
import logging
import re
import yaml

from collections import OrderedDict
from hydra_genetics.utils.io import utils
from pysam import VariantFile


def is_float(element) -> bool:
    try:
        float(element)
        return True
    except ValueError:
        return False


def _and_function(v1, v2): return v1 and v2


def _or_function(v1, v2): return v1 or v2


def _parse_helper(f_iterator):
    """
    function used to parse a string with parenthesis and create a nested fail_filter_list

    :param f_iterator: filter string that will be parsed
    :type f_iterator: string iterator

    :return: return a nested list
    """
    data = []
    filter_string = ''
    for item in f_iterator:
        if item == '(':
            result, closing_parantes = _parse_helper(f_iterator)
            if len(result) == 1:
                result = result[0]
            data.append(result)
            if not closing_parantes:
                raise SyntaxError("Missing closing parantes!")
        elif item == ')':
            if len(filter_string) != 0:
                data.append(filter_string)
            return data, True
        else:
            filter_string += item
            if filter_string.startswith(' or '):
                data.append(_or_function)
                result, p_found = _parse_helper(f_iterator)
                if len(result) == 1:
                    result = result[0]
                data.append(result)
                return data, p_found
            elif filter_string.startswith(' and '):
                data.append(_and_function)
                result, p_found = _parse_helper(f_iterator)
                if len(result) == 1:
                    result = result[0]
                data.append(result)
                return data, p_found
            elif filter_string.endswith(' or '):
                data.append(filter_string[:-4])
                data.append(_or_function)
                result, p_found = _parse_helper(f_iterator)
                if len(result) == 1:
                    result = result[0]
                data.append(result)
                return data, p_found
            elif filter_string.endswith(' and '):
                data.append(filter_string[:-5])
                data.append(_and_function)
                result, p_found = _parse_helper(f_iterator)
                if len(result) == 1:
                    result = result[0]
                data.append(result)
                return data, p_found
    if len(filter_string) != 0:
        data.append(filter_string)
    return data, False


def _convert_string(data, process_function):
    """
    converts a nested list of string into functions
    :params data: data structure with string
    :type list: nested list with strings
    :params process_function: function used to convert string to lambda function
    :type list: function

    :return: nested list with lambda functions
    """
    if isinstance(data, str):
        return process_function(data)
    elif isinstance(data, list):
        if len(data) == 3:
            return [_convert_string(data[0], process_function), data[1], _convert_string(data[2], process_function)]
        elif len(data) == 1:
            return _convert_string(data[0], process_function)
        else:
            raise Exception("Unhandled data structure case {}, unexpected number of items {}!".format(data, len(data)))
    else:
        raise Exception("Unhandled data structure case {}\ntype: {}\nlength: {}".format(data, type(data), len(data)))


def _evaluate_expression(data, variant):
    """
    function used to evaluate variant using a nested list of functions

    :param data: a nested list with functions that will be evaluated
    :type data: nested list
    :param variant: variant that will be evaluated
    :type process_function: a function
    :param

    :return: boolean
    """
    if callable(data):
        return data(variant)
    elif isinstance(data, list):
        if len(data) == 3:
            return data[1](_evaluate_expression(data[0], variant), _evaluate_expression(data[2], variant))
        elif len(data) == 1:
            return _evaluate_expression(data[0], variant)
        else:
            raise Exception("Unhandled data structure case {}, unexpected number of items {}!".format(data, len(data)))
    else:
        raise Exception("Unhandled data structure case {}!".format(data))


def create_variant_filter(filter, string_parser):
    data, _ = _parse_helper(iter(filter))
    data = _convert_string(data, string_parser)
    return lambda variant: _evaluate_expression(data, variant)


def create_convert_expression_function(annotation_extractors):
    """
    :param annotation_extractors: dict with functions that can extract annotations
    :type annotation_extractors: dict
    """

    def na_handling_helper(na_handling, expression):
        if na_handling == "NA_TRUE":
            return True
        elif na_handling == "NA_FALSE":
            return False
        elif na_handling == "NA_ERROR":
            raise ValueError("Couldn't evaluate {} due to missing value".format(expression))

    def regex_compare(regex_exist, value, expression=""):
        if value is None:
            value = ""
        if "!exist" in expression:
            return re.search(regex_exist, value) is None
        else:
            return re.search(regex_exist, value) is not None

    def compare_data(comparison, value1, value2, index1=None, index2=None, na_handling="NA_FALSE", expression=""):
        if isinstance(value1, float):
            if value2 is None:
                return na_handling_helper(na_handling, expression)
            if index2 is not None:
                value2 = value2[index2]
            return comparison(value1, float(value2))
        elif isinstance(value2, float):
            if value1 is None:
                return na_handling_helper(na_handling, expression)
            if index1 is not None:
                value1 = value1[index1]
            return comparison(float(value1), value2)
        else:
            if index2 is not None:
                value2 = value2[index2]
            if index1 is not None:
                value1 = value1[index1]
            if value1 == '-' and value2 is None:
                value2 = '-'
            if value2 == '-' and value1 is None:
                value1 = '-'
            if value1 is None or value2 is None:
                return na_handling_helper(na_handling, expression)
            return comparison(value1, value2)

    def convert_to_expression(expression):
        """
        Valid format of expression is:
        - DATA_SOURCE:NA_HANDLING(OPTIONAL):FIELD:COLUMN(OPTIONAL) [<|>|=|!=] VALUE
        - VALUE [<|>|=|!=] DATA_SOURCE:NA_HANDLING(OPTIONAL):FIELD:COLUMN(OPTIONAL)
        - exist[regex, DATA_SOURCE:NA_HANDLING(OPTIONAL):FIELD:COLUMN]
        - !exist[regex, DATA_SOURCE:NA_HANDLING(OPTIONAL):FIELD:COLUMN]

        DATA_SOURCE:
         - VEP
         - FORMAT
         - INFO
         - QUAL

        NA_HANDLING:
         - NA_TRUE: when None is found the expression will return True and filter variant
         - NA_FALSE: when None is found the expression will return False and not filter variant
         - NA_ERROR: when None is found the an error will be raised if value not found
        Default will be NA_FALSE, i.e a filter will not remove variant

        FIELD, any field in info, format or vep string

        COLUMN, used to extract value from tuple

        :params expression
        :type expression: string
        """
        comparison = {
            ">": lambda value1, value2: value1 > value2,
            "<": lambda value1, value2: value1 < value2,
            "=": lambda value1, value2: value1 == value2,
            "!=": lambda value1, value2: value1 != value2
        }

        # Extract information about how None values should be handled during filtering
        regex_na_handling = r"(VEP|FORMAT|INFO|QUAL):(NA_TRUE|NA_FALSE|NA_ERROR):"
        na_handling = re.search(regex_na_handling, expression)
        if na_handling:
            na_handling = na_handling.groups()[1]
            # Remove NA handling information from expression
            expression = re.sub(r"(NA_TRUE|NA_FALSE|NA_ERROR):", '', expression)
        else:
            na_handling = "NA_FALSE"
        regex_string = "[ ]*(VEP|FORMAT|INFO):([A-Za-z0-9_.]+):*([0-9]*)"

        if "exist[" in expression:
            # Handle exist expression
            # Example "exist[XM_[0-9]+, VEP:Feature]"
            exist_statment, regex_exist, field = re.search(r"([!]{0,1}exist)\[(.+),[ ]*(.+)\]", expression).groups()
            source, field, index = re.search(regex_string, field).groups()
            if len(index) > 0:
                def get_value(variant):
                    return annotation_extractors[source](variant, field)[int(index)]
            else:
                def get_value(variant):
                    value = annotation_extractors[source](variant, field)
                    if isinstance(value, tuple):
                        value = ",".join(value)
                    return value
            return lambda variant: regex_compare(regex_exist, get_value(variant), expression)
        elif "QUAL" in expression.strip():
            data = re.split("[ ]([<>=!]+)[ ]", expression)
            if "QUAL" in data[0]:
                value = float(data[2])
                return lambda variant: compare_data(
                                                        comparison[data[1]],
                                                        annotation_extractors["QUAL"](variant),
                                                        value,
                                                        na_handling=na_handling,
                                                        expression=expression
                                                    )
            elif "QUAL" in data[2]:
                value = float(data[0])
                return lambda variant: compare_data(
                                                        comparison[data[1]],
                                                        value,
                                                        annotation_extractors["QUAL"](variant),
                                                        na_handling=na_handling,
                                                        expression=expression
                                                    )

        else:
            # Handle comparison expression
            # Example "FORMAT:NA_TRUE:SB_mutect2:1 > 400"
            data = re.split("[ ]([<>=!]+)[ ]", expression)
            if len(data) != 3:
                raise Exception("Invalid expression: " + expression)

            if "VEP:" in data[0] or "FORMAT:" in data[0] or "INFO:" in data[0]:
                source, field, index = re.search(regex_string, data[0]).groups()
                if len(index) == 0:
                    index = None
                else:
                    index = int(index)
                try:
                    data[2] = data[2].rstrip(" ").lstrip(" ")
                    value2 = float(data[2])
                    return lambda variant: compare_data(
                                                            comparison[data[1]],
                                                            annotation_extractors[source](variant, field),
                                                            value2, index1=index, na_handling=na_handling,
                                                            expression=expression
                                                        )
                except ValueError:
                    return lambda variant: compare_data(
                                                            comparison[data[1]],
                                                            annotation_extractors[source](variant, field),
                                                            data[2], index1=index, na_handling=na_handling,
                                                            expression=expression
                                                        )
            elif "VEP:" in data[2] or "FORMAT:" in data[2] or "INFO:" in data[2]:
                source, field, index = re.search(regex_string, data[2]).groups()
                if len(index) == 0:
                    index = None
                else:
                    index = int(index)
                try:
                    data[0] = data[0].rstrip(" ").lstrip(" ")
                    value1 = float(data[0])
                    return lambda variant: compare_data(
                                                            comparison[data[1]],
                                                            value1,
                                                            annotation_extractors[source](variant, field),
                                                            index2=index, na_handling=na_handling,
                                                            expression=expression
                                                        )
                except ValueError:
                    return lambda variant: compare_data(
                                                            comparison[data[1]],
                                                            data[0],
                                                            annotation_extractors[source](variant, field),
                                                            index2=index, na_handling=na_handling,
                                                            expression=expression
                                                        )
            else:
                raise Exception("Could not find comparison field in: " + expression)

    return convert_to_expression


def check_yaml_file(variants, filters):
    for filter in filters["filters"]:
        if "expression" not in filters["filters"][filter]:
            raise Exception("No expression entry for %s" % filter)
        filter_text = ""
        if (
                "soft_filter" not in filters["filters"][filter] or
                ("soft_filter" in filters["filters"][filter] and filters["filters"][filter]["soft_filter"] != "False")
        ):
            if "soft_filter_flag" not in filters["filters"][filter]:
                raise Exception("No soft_filter_flag entry for %s" % filter)
            if "description" not in filters["filters"][filter]:
                filter_text = "Failed %s filter" % filter
            else:
                filter_text = filters["filters"][filter]["description"]
        elif "description" not in filters["filters"][filter]:
            filter_text = "Failed %s filter (hard filtered)" % filter
        else:
            filter_text = "%s %s" % (filters["filters"][filter]["description"], "(hard filtered)")
        if "soft_filter_flag" in filters["filters"][filter]:
            variants.header.filters.add(filters["filters"][filter]["soft_filter_flag"], None, None, filter_text)


def filter_variants(sample_name_regex, in_vcf, out_vcf, filter_yaml_file):
    variants = VariantFile(in_vcf)
    log = logging.getLogger()

    log.info("Load yaml filter config file")
    filters = {"filters": []}
    if filter_yaml_file is not None:
        log.info("Process yaml for: {}".format(filter_yaml_file))
        with open(filter_yaml_file) as file:
            filters = yaml.load(file, Loader=yaml.FullLoader)

    log.info("Checking yaml file parameters")
    check_yaml_file(variants, filters)

    log.info("Process vcf header: {}".format(in_vcf))
    annotation_extractor = {}
    for record in variants.header.records:
        if record.type == "INFO":
            if record['ID'] == "CSQ":
                log.info(" -- found vep information: {}".format(in_vcf))
                log.debug(" -- -- {}".format(record['Description'].split("Format: ")[1].split("|")))
                vep_fields = {v: c for c, v in enumerate(record['Description'].split("Format: ")[1].split("|"))}
                annotation_extractor["VEP"] = utils.get_annotation_data_vep(vep_fields)

    vcf_out = VariantFile(out_vcf, 'w', header=variants.header)

    log.info("Mapping samples")
    sample_format_index_mapper = {sample: index for index, sample in enumerate(variants.header.samples)}
    sample_index = 0

    sample_name_match = False
    number_of_matches = 0
    for name, index in sample_format_index_mapper.items():
        if re.search(sample_name_regex, name):
            sample_index = index
            sample_name_match = True
            number_of_matches = number_of_matches + 1

    if number_of_matches > 1:
        raise Exception("More then one sample match regex")

    if not sample_name_match and len(sample_format_index_mapper) > 1:
        raise Exception("More then one sample and no regex")

    if sample_name_match:
        log.info(f"Using index: {sample_index}, found using regex {sample_name_regex}")
    else:
        log.info(f"Using default index: {sample_index}, nothing found using regex {sample_name_regex}")

    log.info("Process variants")
    annotation_extractor['FORMAT'] = utils.get_annotation_data_format(sample_index)
    annotation_extractor['INFO'] = utils.get_annotation_data_info
    annotation_extractor['QUAL'] = lambda variant: None if variant.qual == '.' else variant.qual
    expression_converter = create_convert_expression_function(annotation_extractor)

    vcf_filters = []
    soft_filters = []
    for filter, value in filters["filters"].items():
        vcf_filters.append(create_variant_filter(value['expression'], expression_converter))
        if "soft_filter" in filters["filters"][filter]:
            soft_filters.append([filter, filters["filters"][filter]["soft_filter"] != "False"])
        else:
            soft_filters.append([filter, True])
    for variant in variants:
        hard_filter = False
        i = 0
        for vcf_filter in vcf_filters:
            try:
                if vcf_filter(variant):
                    if soft_filters[i][1]:
                        variant.filter.add(filters["filters"][soft_filters[i][0]]["soft_filter_flag"])
                    else:
                        hard_filter = True
                i += 1
            except TypeError as e:
                log.error("Could not filter variant: '{}' with filter '{}'\n".format(str(variant), str(vcf_filter)))
                raise e
        if not hard_filter:
            vcf_out.write(variant)

    vcf_out.close()


if __name__ == "__main__":
    sample_name_regex = snakemake.params.sample_name_regex
    in_vcf = snakemake.input.vcf
    out_vcf = snakemake.output.vcf
    filter_yaml_file = snakemake.params.filter_config

    filter_variants(sample_name_regex, in_vcf, out_vcf, filter_yaml_file)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
__author__ = "Johannes Köster"
__copyright__ = "Copyright 2016, Johannes Köster"
__email__ = "[email protected]"
__license__ = "MIT"


from snakemake.shell import shell
from snakemake_wrapper_utils.bcftools import get_bcftools_opts

bcftools_opts = get_bcftools_opts(snakemake, parse_ref=False, parse_memory=False)
extra = snakemake.params.get("extra", "")
log = snakemake.log_fmt_shell(stdout=True, stderr=True)

shell("bcftools view {bcftools_opts} {extra} {snakemake.input[0]} {log}")
ShowHide 4 more snippets with no or duplicated tags.

Login to post a comment if you would like to share your experience with this workflow.

Do you know this workflow well? If so, you can request seller status , and start supporting this workflow.

Free

Created: 1yr ago
Updated: 1yr ago
Maitainers: public
URL: https://github.com/hydra-genetics/filtering
Name: filtering
Version: v0.3.0
Badge:
workflow icon

Insert copied code into your website to add a link to this workflow.

Downloaded: 0
Copyright: Public Domain
License: GNU General Public License v3.0
  • Future updates

Related Workflows

cellranger-snakemake-gke
snakemake workflow to run cellranger on a given bucket using gke.
A Snakemake workflow for running cellranger on a given bucket using Google Kubernetes Engine. The usage of this workflow ...