script to convert phewascatalog database to JSONs, with ENSGIDs and EFO ids.

public public 1yr ago Version: 2.1.1 0 bookmarks

This repository contains a collection of modules which generate evidence for several internal data sources (“internal” meaning that the code is maintained by the data team; the data itself usually comes from sources outside Open Targets).

How to set up and update the environment

The file requirements.txt contains the direct dependencies only with their exact versions pinned. Only this file should be edited directly.

Additionally, the file requirements-frozen.txt contains all direct and transitive dependencies with their exact versions pinned. It is generated automatically from the former file, and is intended for reproducing the exact working environment (as mush as possible) as of any particular commit version. This file should not be directly edited.

These are the steps to update an environment:

  • Add, delete or update packages as necessary in requirements.txt

  • Install requirements with pip3 install -r requirements.txt

  • Make sure everything works

  • Update the frozen file with pip3 freeze > requirements-frozen.txt

  • Include both files, requirements.txt and requirements-frozen.txt , with your PR

Make sure to always work in a clean virtual environment to avoid any surprises.

How to generate the evidence

This will create a Google Cloud instance, SSH into it, install the necessary dependencies, generate, validate, and upload the evidence. Tweak the commands as necessary.

To run this, conditions related to the service accounts need to be satisfied:

  1. The service account must have a Storage Admin role for two buckets, _otar000-evidence_input_ and _otar001-core_ .

  2. The service account must have a Compute Admin and Service Account User roles in the open-targets-eu-dev project.

  3. The user running the code must have access to use the service account.

By default, the generated evidence will be validated using the latest master snapshot of the JSON schema. This can be tweaked in configuration.yaml → global → schema.

# Set parameters.
export INSTANCE_NAME=evidence-generation
export INSTANCE_ZONE=europe-west1-d
# Create the instance and SSH.
gcloud compute instances create \
 ${INSTANCE_NAME} \
 --project=open-targets-eu-dev \
 --zone=${INSTANCE_ZONE} \
 --machine-type=n1-highmem-32 \
 --service-account=426265110888[email protected] \
 --scopes=https://www.googleapis.com/auth/cloud-platform \
 --create-disk=auto-delete=yes,boot=yes,device-name=${INSTANCE_NAME},image=projects/ubuntu-os-cloud/global/images/ubuntu-2004-focal-v20210927,mode=rw,size=2000,type=projects/open-targets-eu-dev/zones/europe-west1-d/diskTypes/pd-balanced
gcloud compute ssh --zone ${INSTANCE_ZONE} ${INSTANCE_NAME}
screen
# Install the system dependencies.
sudo apt update
sudo apt install -y openjdk-8-jdk-headless python3-pip python3.8-venv
# Creating key for GCP service accout:
export FILENAME=~/gcp-account.json
export ACCOUNT_NAME=426265110888[email protected]
export PROJECT=open-targets-eu-dev
gcloud iam service-accounts keys create ${FILENAME} \
 --iam-account=${ACCOUNT_NAME} \
 --project ${PROJECT}
# Strore credentials in the default variable:
export GOOGLE_APPLICATION_CREDENTIALS=${FILENAME}
# Activate the environment and install Python dependencies.
git clone https://github.com/opentargets/evidence_datasource_parsers
cd evidence_datasource_parsers
python3 -m venv env
source env/bin/activate
pip3 install --upgrade pip setuptools
pip3 install -r requirements-frozen.txt
export PYTHONPATH="$PYTHONPATH:$(pwd)"

At this point, we are ready to run the Snakemake pipeline. The following options are available:

  • snakemake --cores all : Display help (the list of possible rules to be run) and do not run anything.

  • snakemake --cores all --until local : Generate all files, but do not upload them to Google Cloud Storage. The files generated in this way do not have prefixes, e.g. cancer_biomarkers.json.gz . This is done intentionally, so that the pipeline can be re-run the next day without having to re-generate all the files.

    • It is also possible to locally run only a single rule by substituting its name instead of “local”.
  • snakemake --cores all --until all : Generate all files and then upload them to Google Cloud Storage.

All individual parser rules are strictly local. The only rule which uploads files to Google Cloud Storage (all at once) is "all".

Some additional parameters which can be useful for debugging:

  • --keep-incomplete : This will keep the output files of failed jobs. Must only be used with local runs. Note that Snakemake uses the presence of the output files to decide which jobs to run, so the incomplete files must be removed after investigation and before any re-runs of the workflow.

  • --dry-run : Do not run the workflow, and only show the list of jobs to be run.

Processing Genetics Portal evidence

Evidence generation for the Genetics Portal is not automated in the Snakefile. It can be done separately using the following commands. It is planned that this step will eventually become part of the Genetics Portal release pipeline.

gcloud dataproc clusters create \
 cluster-l2g-data \
 --image-version=1.4 \
 --properties=spark:spark.debug.maxToStringFields=100,spark:spark.executor.cores=31,spark:spark.executor.instances=1 \
 --master-machine-type=n1-standard-32 \
 --master-boot-disk-size=1TB \
 --zone=europe-west1-d \
 --single-node \
 --max-idle=5m \
 --region=europe-west1 \
 --project=open-targets-genetics
gcloud dataproc jobs submit pyspark \
 --cluster=cluster-l2g-data \
 --project=open-targets-genetics \
 --region=europe-west1 \
 modules/GeneticsPortal.py -- \
 --locus2gene gs://genetics-portal-data/l2g/200127 \
 --toploci gs://genetics-portal-data/v2d/200207/toploci.parquet \
 --study gs://genetics-portal-data/v2d/200207/studies.parquet \
 --threshold 0.05 \
 --variantIndex gs://genetics-portal-data/variant-annotation/190129/variant-annotation.parquet \
 --ecoCodes gs://genetics-portal-data/lut/vep_consequences.tsv \
 --outputFile gs://genetics-portal-analysis/l2g-platform-export/data/genetics_portal_evidence.json.gz

Notes on how this repository is organised

Each module in modules/ corresponds to one evidence generator.

Modules which are shared by multiple generators reside in common/ .

The Conda virtual environment files, as well as instructions on how to maintain them, are available in envs/ .

Historical notes on individual parsers

Note that some information in this section may be obsolete. It is provided for historical purposes, and will be eventually migrated into the source code of individual parsers or into the Snakefile.

ClinGen

The ClinGen parser processes the Gene Validity Curations table that can be downloaded from https://search.clinicalgenome.org/kb/gene-validity . As of January 2021 the downloadable file contains six header lines that look as follows:

CLINGEN GENE VALIDITY CURATIONS
FILE CREATED: 2021-01-18
WEBPAGE: https://search.clinicalgenome.org/kb/gene-validity
++++++++++,++++++++++++++,+++++++++++++,++++++++++++++++++,+++++++++,+++++++++,+++++++++,++++++++++++++,+++++++++++++,+++++++++++++++++++
GENE SYMBOL,GENE ID (HGNC),DISEASE LABEL,DISEASE ID (MONDO),MOI,SOP,CLASSIFICATION,ONLINE REPORT,CLASSIFICATION DATE,GCEP
+++++++++++,++++++++++++++,+++++++++++++,++++++++++++++++++,+++++++++,+++++++++,+++++++++,++++++++++++++,+++++++++++++,+++++++++++++++++++

Gene2Phenotype

The Gene2Phenotype parser processes the four gene panels (Developmental Disorders - DD, eye disorders, skin disorders and cancer) that can be downloaded from https://www.ebi.ac.uk/gene2phenotype/downloads/ .

Genomics England Panel App

The Genomics England parser processes the associations between genes and diseases described in the Gene Panels Data table. This data is provided by Genomics England and can be downloaded here from the otar000-evidence_input bucket.

The source table is then formatted into a compressed set of JSON lines following the schema of the version to be used.

IntOGen

The intOGen parser generates evidence strings from three files that need to be in the working directory or in the resources directory:

  • intogen_cancer2EFO_mapping.tsv : Mappings of intOGen cancer type acronyms to EFO terms. Currently is a manually curated file given that in intOGen they do not use an specific ontology to model cancers.

  • intogen_cohorts.tsv : It contains information about the analysed cohorts and it can be downloaded from the intOGen website . In the current implementation, the total number of samples included in each cohort is used to calculate the percentage of samples that carry a relevant mutation in a driver gene.

  • intogen_Compendium_Cancer_Genes.tsv : It contains the genes that have been identified as drivers in at least one cohort and information related to the methods that yield significant results, the q-value and additional data about mutations. It can be downloaded from the same place as the cohorts file.

CRISPR

The CRISPR parser processes three files available in the resources directory:

  • crispr_evidence.tsv : Main file that contains the prioritisation score as well as target, disease and publication information. It was adapted from supplementary table 6 in the Behan et al. 2019 paper.

  • crispr_descriptions.tsv : File used to extract the number of targets prioritised per cancer types as well as to retrieve the cell lines from the next file.

  • crispr_cell_lines.tsv : It contains three columns with the cell line, tissue and the cancer type information. It is used to extract the cell lines in which the target has been identified as essential. The file has been adapted from the supplementary table 1 in the Behan et al. 2019 paper.

PROGENy

The PROGENy parser processes three files:

  • progeny_normalVStumor_opentargets.txt: Main file that contains a systematic comparison of pathway activities between normal and primary TCGA samples in 14 tumor types using PROGENy. This file can be downloaded here from the otar000-evidence_input bucket.

  • cancer2EFO_mappings.tsv: File containing the mappings between the acronym of the type of cancer and its respective disease listed in EFO. This file can be found in the resources directory.

  • pathway2Reactome_mappings.tsv: File containing the mappings between the analysed pathway and their respective target and ID in Reactome. This file can be found in the resources directory.

SystemsBiology

This parser processes two files stored in Google cloud bucket: gs://otar000-evidence_input/SysBio/data_files . The sysbio_evidence-31-01-2019.tsv file contains evidence data, while sysbio_publication_info_nov2018.tsv contains study level information.

SLAPenrich

The SLAPenrich parser processes two files:

  • slapenrich_opentargets.tsv : Main file that contains a systematic comparison of on somatic mutations from TCGA across 25 different cancer types and a collection of pathway gene sets from Reactome. This file can be downloaded here from the otar000-evidence_input bucket.

  • cancer2EFO_mappings.tsv : File containing the mappings between the acronym of the type of cancer and its respective disease listed in EFO. This file can be found in the resources directory.

IMPC

Generates the mouse model target-disease evidence by querying the IMPC SOLR API.

The base of the evidence is the disease_model_summary table, which is unique on the combination of ( model_id , disease_id ). When target information is added, an original row may explode into multiple evidence strings. As a result, the final output is unique on the combination of ( biologicalModelId , targetFromSourceId , targetInModelId , diseaseFromSourceId ).

Code Snippets

  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
import argparse
import logging
import re

from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import (
    array_distinct, col, collect_set, concat, element_at, explode, lit, initcap,
    regexp_extract, regexp_replace, size, split, struct, translate, trim, udf, upper, when
)
from pyspark.sql.types import StringType, ArrayType

from common.evidence import write_evidence_strings


ALTERATIONTYPE2FUNCTIONCSQ = {
    # TODO: Map BIA
    'MUT': 'SO_0001777',  # somatic_variant
    'CNA': 'SO_0001563',  # copy_number_change
    'FUS': 'SO_0001882',  # feature_fusion
    'EXPR': None,
    'BIA': None
}

DRUGRESPONSE2EFO = {
    'Responsive': 'GO_0042493',  # response to drug
    'Not Responsive': 'EFO_0020002',  # lack of efficacy
    'Resistant': 'EFO_0020001',  # drug resistance
    'Increased Toxicity': 'EFO_0020003',  # drug toxicity
    'Increased Toxicity (Myelosupression)': 'EFO_0007053',  # myelosuppression
    'Increased Toxicity (Ototoxicity)': 'EFO_0006951',  # ototoxicity
    'Increased Toxicity (Hyperbilirubinemia)': 'HP_0002904',  # Hyperbilirubinemia
    'Increased Toxicity (Haemolytic Anemia)': 'EFO_0005558'  # hemolytic anemia
}

GENENAMESOVERRIDE = {
    # Correct gene names to use their approved symbol
    'C15orf55': 'NUTM1',
    'MLL': 'KMT2A',
    'MLL2': 'KMT2D'
}


class cancerBiomarkersEvidenceGenerator:

    def __init__(self):
        # Create spark session
        self.spark = (
            SparkSession.builder
            .appName('cancer-biomarkers')
            .getOrCreate()
        )

        self.evidence = None

        self.get_variantId_udf = udf(
            cancerBiomarkersEvidenceGenerator.get_variantId,
            StringType()
        )
        self.zip_alterations_with_type_udf = udf(
            cancerBiomarkersEvidenceGenerator.zip_alterations_with_type,
            ArrayType(ArrayType(StringType()))
        )

    def compute(
        self,
        biomarkers_table: str,
        source_table: str,
        disease_table: str,
        drug_index: str,
        output_file: str
    ) -> None:
        """Loads and processes inputs to generate the Cancer Biomarkers evidence strings"""

        # Import data
        biomarkers_df = self.spark.read.csv(biomarkers_table, sep='\t', header=True)
        source_df = self.spark.read.json(source_table).select(
            col('label').alias('niceName'),
            'source', 'url')
        disease_df = self.spark.read.json(disease_table).select(
            regexp_replace(col('name'), '_', '').alias('tumor_type'),
            regexp_extract(col('url'), r'[^/]+$', 0).alias('diseaseFromSourceMappedId'))
        drugs_df = self.spark.read.parquet(drug_index).select(
            col('id').alias('drugId'), col('name').alias('drug'))

        # Process inputs to generate evidence strings
        evidence = self.process_biomarkers(
            biomarkers_df, source_df, disease_df, drugs_df
        )

        # Write evidence strings
        write_evidence_strings(self.evidence, output_file)
        logging.info(f'{evidence.count()} evidence strings have been saved to {output_file}.')

    def process_biomarkers(
        self,
        biomarkers_df: DataFrame,
        source_df: DataFrame,
        disease_df: DataFrame,
        drugs_df: DataFrame
    ) -> DataFrame:
        """The diverse steps to prepare and enrich the input table"""

        biomarkers_enriched = (
            biomarkers_df
            .select(
                'Biomarker', 'IndividualMutation',
                array_distinct(split(col('Alteration'), ';')).alias('alterations'),
                array_distinct(split(col('Gene'), ';')).alias('gene'),
                split(col('AlterationType'), ';').alias('alteration_types'),
                array_distinct(split(col("PrimaryTumorTypeFullName"), ";")).alias('tumor_type_full_name'),
                array_distinct(split(col('Drug'), ';|,')).alias('drug'),
                'DrugFullName', 'Association', 'gDNA',
                array_distinct(split(col('EvidenceLevel'), ',')).alias('confidence'),
                array_distinct(split(col('Source'), ';')).alias('source')
            )
            .withColumn('confidence', explode(col('confidence')))
            .withColumn('tumor_type_full_name', explode(col('tumor_type_full_name')))
            .withColumn('tumor_type', translate(col('tumor_type_full_name'), ' -', ''))
            .withColumn('drug', explode(col('drug')))
            .withColumn('drug', translate(col('drug'), '[]', ''))
            .withColumn('gene', explode(col('gene')))
            .replace(to_replace=GENENAMESOVERRIDE, subset=['gene'])
            .withColumn('gene', upper(col('gene')))
            # At this stage alterations and alteration_types are both arrays
            # Disambiguation when the biomarker consists of multiple alterations is needed
            # This is solved by:
            # 1. Zipping both fields - tmp consists of a list of alteration/type tuples
            # 2. tmp is exploded - tmp consists of the alteration/type tuple
            # 3. alteration & alteration_type columns are overwritten with the elements in the tuple
            .withColumn(
                'tmp',
                self.zip_alterations_with_type_udf(col('alterations'), col('alteration_types')))
            .withColumn('tmp', explode(col('tmp')))
            .withColumn('alteration_type', element_at(col('tmp'), 2))
            .withColumn(
                'alteration',
                when(
                    ~col('IndividualMutation').isNull(),
                    col('IndividualMutation')
                )
                .otherwise(element_at(col('tmp'), 1))
            )
            .drop('tmp')
            # Clean special cases on the alteration string
            .withColumn(
                'alteration',
                when(
                    col('alteration') == 'NRAS:.12.,.13.,.59.,.61.,.117.,.146.',
                    col('Biomarker')  # 'NRAS (12,13,59,61,117,146)'
                )
                .when(
                    # Cleans strings like 'ARAF:.'
                    col('alteration').contains(':.'),
                    translate(col('alteration'), ':.', '')
                )
                .when(
                    # Fusion genes are described with '__'
                    # biomarker is a cleaner representation when there's one alteration
                    (col('alteration').contains('__')) & (~col('Biomarker').contains('+')),
                    col('Biomarker')
                )
                .otherwise(col('alteration'))
            )
            # Split source into literature and urls
            # literature contains PMIDs
            # urls are enriched from the source table if not a CT
            .withColumn('source', explode(col('source')))
            .withColumn('source', trim(regexp_extract(col('source'), r'(PMID:\d+)|([\w ]+)', 0).alias('source')))
            .join(source_df, on='source', how='left')
            .withColumn(
                'literature',
                when(col('source').startswith('PMID'), regexp_extract(col('source'), r'(PMID:)(\d+)', 2))
            )
            .withColumn(
                'urls',
                when(
                    col('source').startswith('NCT'),
                    struct(
                        lit('Clinical Trials').alias('niceName'),
                        concat(lit('https://clinicaltrials.gov/ct2/show/'), col('source')).alias('url')
                    )
                )
                .when(
                    (~col('source').startswith('PMID')) | (~col('source').startswith('NCIT')),
                    struct(col('niceName'), col('url'))
                )
            )
            # The previous conditional clause creates a struct regardless of
            # whether any condition is met. The empty struct is replaced with null
            .withColumn('urls', when(~col('urls.niceName').isNull(), col('urls')))
            # Enrich data
            .withColumn('functionalConsequenceId', col('alteration_type'))
            .replace(to_replace=ALTERATIONTYPE2FUNCTIONCSQ, subset=['functionalConsequenceId'])
            .replace(to_replace=DRUGRESPONSE2EFO, subset=['Association'])
            .join(disease_df, on='tumor_type', how='left')
            .withColumn('drug', upper(col('drug')))
            .withColumn(
                # drug class is coalesced when the precise name of the medicine is not provided
                'drug',
                when(col('drug') == '', col('DrugFullName')).otherwise(col('drug')))
            .join(drugs_df, on='drug', how='left')
            .withColumn('drug', initcap(col('drug')))
            # Translate variantId
            .withColumn(
                'variantId',
                when(~col('gDNA').isNull(), self.get_variantId_udf(col('gDNA')))
            )
            # Assign a GO ID when a gene expression data is reported
            .withColumn(
                'geneExpressionId',
                when(
                    (col('alteration_type') == 'EXPR') & (col('alteration').contains('over')),
                    'GO_0010628'
                )
                .when(
                    (col('alteration_type') == 'EXPR') & (col('alteration').contains('under')),
                    'GO_0010629'
                )
                .when(
                    (col('alteration_type') == 'EXPR') & (col('alteration').contains('norm')),
                    'GO_0010467'
                )
            )
            # Create variant struct
            .withColumn(
                'variant',
                when(
                    col('alteration_type') != 'EXPR',
                    struct(
                        col('alteration').alias('name'),
                        col('variantId').alias('id'),
                        col('functionalConsequenceId')
                    )
                )
            )
            # Create geneExpression struct
            .withColumn(
                'geneExpression',
                when(
                    col('alteration_type') == 'EXPR',
                    struct(
                        col('alteration').alias('name'),
                        col('geneExpressionId').alias('id'))
                )
            )
        )

        pre_evidence = (
            biomarkers_enriched
            .withColumn('datasourceId', lit('cancer_biomarkers'))
            .withColumn('datatypeId', lit('affected_pathway'))
            .withColumnRenamed('tumor_type_full_name', 'diseaseFromSource')
            .withColumnRenamed('drug', 'drugFromSource')
            # diseaseFromSourceMappedId, drugId populated above
            .withColumnRenamed('Association', 'drugResponse')
            # confidence, literature and urls populated above
            .withColumnRenamed('gene', 'targetFromSourceId')
            .withColumnRenamed('Biomarker', 'biomarkerName')
            # variant, geneExpression populated above
            .drop(
                'tumor_type', 'source', 'alteration', 'alteration_type', 'IndividualMutation', 'geneExpressionId',
                'gDNA', 'functionalConsequenceId', 'variantId', 'DrugFullName', 'niceName', 'url')
        )

        # Group evidence
        self.evidence = (
            pre_evidence
            .groupBy('datasourceId', 'datatypeId', 'drugFromSource', 'drugId',
                     'drugResponse', 'targetFromSourceId', 'diseaseFromSource',
                     'diseaseFromSourceMappedId', 'confidence', 'biomarkerName')
            .agg(
                collect_set('literature').alias('literature'),
                collect_set('urls').alias('urls'),
                collect_set('variant').alias('variant'),
                collect_set('geneExpression').alias('geneExpression'),
            )
            # Replace empty lists with null values
            .withColumn('literature', when(size(col('literature')) == 0, lit(None)).otherwise(col('literature')))
            .withColumn('urls', when(size(col('urls')) == 0, lit(None)).otherwise(col('urls')))
            .withColumn('variant', when(size(col('variant')) == 0, lit(None)).otherwise(col('variant')))
            .withColumn(
                'geneExpression',
                when(size(col('geneExpression')) == 0, lit(None))
                .otherwise(col('geneExpression')))
            # Collect variant info into biomarkers struct
            .withColumn(
                'biomarkers',
                struct(
                    'variant',
                    'geneExpression'
                ))
            .drop('variant', 'geneExpression')
            .distinct()
        )

        return self.evidence

    @staticmethod
    def get_variantId(gDNA: str) -> str:
        '''
        Converts the genomic coordinates to the CHROM_POS_REF_ALT notation
        Ex.: 'chr14:g.105243048G_T' --> '14_105243048_G_T'
        '''
        translate_dct = {'chr': '', ':g.': '_', '>': '_', 'del': '_', 'ins': '_'}
        try:
            for k, v in translate_dct.items():
                gDNA = gDNA.replace(k, v)
            x, head, tail = re.split(r'^(.*?_\d+)', gDNA)
            if bool(re.search(r'\d+', tail)):
                tail = re.split(r'^(_\d+_)', tail)[-1]
            return head + '_' + tail
        except AttributeError:
            return

    @staticmethod
    def zip_alterations_with_type(alterations, alteration_type):
        '''
        Zips in a tuple the combination of the alteration w/ its correspondent type
        so that when multiple alterations are reported, these can be disambiguated.
        By expanding the array of alteration types it accounts for the cases when
        several alterations are reported but only one type is given.
        Ex.:
        alterations = ['MET:Y1230C', 'Y1235D']
        alteration_type = ['MUT']
        --> [('MET:Y1230C', 'MUT'), ('Y1235D', 'MUT')]
        '''
        alteration_types = alteration_type * len(alterations)
        return list(zip(alterations, alteration_types))


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('--biomarkers_table',
                        help='Input TSV table containing association data for the biomarkers.', required=True)
    parser.add_argument('--source_table', required=True,
                        help='Input JSON file with the annotations to reference the source of the associations.')
    parser.add_argument('--disease_table',
                        help='Input JSON file with the mapping of the tumor types to EFO IDs.', required=True)
    parser.add_argument('--drug_index', help='Directory of parquet files that stores OT\'s disease index.',
                        required=True)
    parser.add_argument('--output_file', help='Output gzipped json file containing the evidence.', required=True)
    args = parser.parse_args()

    logging.basicConfig(
        level=logging.INFO,
        format='%(name)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S',
    )

    cancerBiomarkersEvidenceGenerator().compute(
        args.biomarkers_table,
        args.source_table,
        args.disease_table,
        args.drug_index,
        args.output_file
    )
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import argparse
import logging
import sys

import pyspark.sql.functions as f

from pyspark.sql.dataframe import DataFrame

from common.evidence import initialize_sparksession, write_evidence_strings


def main(chembl_evidence: str, predictions: str, output_file: str) -> None:
    """
    This module adds the studyStopReasonCategories to the ChEMBL evidence as a result of the categorisation of the clinical trial reason to stop.
    Args:
        chembl_evidence: Input gzipped JSON with the evidence submitted by ChEMBL.
        predictions: Input JSON containing the categories of the clinical trial reason to stop. 
        Instructions for applying the ML model here: https://github.com/ireneisdoomed/stopReasons.
        output_file: Output gzipped json file containing the ChEMBL evidence with the additional studyStopReasonCategories field.
        log_file: Destination of the logs generated by this script. Defaults to None.
    """
    logging.info(f'ChEMBL evidence JSON file: {chembl_evidence}')
    logging.info(f'Classes of reason to stop table: {predictions}')

    chembl_df = spark.read.json(chembl_evidence).repartition(200).persist()
    predictions_df = (
        spark.read.json(predictions)
        .transform(prettify_subclasses)
        .distinct()
    )

    # Join datasets
    early_stopped_evd_df = (
        # Evidence with a given reason to stop is always supported by a single NCT ID
        chembl_df.filter(f.col('studyStopReason').isNotNull())
        .select(
            "*", f.explode(f.col("urls.url")).alias("nct_id")
        ).withColumn("nct_id", f.element_at(f.split(f.col("nct_id"), "/"), -1))
        .join(predictions_df, on='nct_id', how='left').drop('nct_id')
        .distinct()
    )

    # We expect that ~10% of evidence strings have a reason to stop assigned
    # It is asserted that this fraction is between 9 and 11% of the total count
    total_count = chembl_df.count()
    early_stopped_count = early_stopped_evd_df.count()

    if not 0.08 < early_stopped_count / total_count < 0.11:
        raise AssertionError(f'The fraction of evidence with a CT reason to stop class is not as expected ({early_stopped_count / total_count}).')

    logging.info('Evidence strings have been processed. Saving...')
    enriched_chembl_df = (
        chembl_df.filter(f.col('studyStopReason').isNull())
        .unionByName(early_stopped_evd_df, allowMissingColumns=True)
    )
    assert enriched_chembl_df.count() == chembl_df.count()
    write_evidence_strings(enriched_chembl_df, output_file)

    logging.info(f'{total_count} evidence strings have been saved to {output_file}. Exiting.')

def prettify_subclasses(predictions_df: DataFrame) -> DataFrame:
    """
    List of categories must be converted formatted with a nice name.
    """

    CATEGORIESMAPPINGS = {
        'Business_Administrative': 'Business or administrative',
        'Logistics_Resources': 'Logistics or resources',
        'Covid19': 'COVID-19',
        'Safety_Sideeffects': 'Safety or side effects',
        'Endpoint_Met': 'Met endpoint',
        'Insufficient_Enrollment': 'Insufficient enrollment',
        'Negative': 'Negative',
        'Study_Design': 'Study design',
        'Invalid_Reason': 'Invalid reason',
        'Study_Staff_Moved': 'Study staff moved',
        'Another_Study': 'Another study',
        'No_Context': 'No context',
        'Regulatory': 'Regulatory',
        'Interim_Analysis': 'Interim analysis',
        'Success': 'Success',
        'Ethical_Reason': 'Ethical reason',
        'Insufficient_Data': 'Insufficient data',
        'Uncategorised': 'Uncategorised'
    }

    sub_mapping_col = f.map_from_entries(f.array(*[f.struct(f.lit(k), f.lit(v)) for k, v in CATEGORIESMAPPINGS.items()]))

    return (
        predictions_df  
        .select('nct_id', 'subclasses', sub_mapping_col.alias('prettyStopReasonsMap'))
        # Create a MapType column to convert each element of the subclasses array
        .withColumn('studyStopReasonCategories', f.expr('transform(subclasses, x -> element_at(prettyStopReasonsMap, x))'))
        .drop('subclasses', 'prettyStopReasonsMap')
    )

def get_parser():
    """Get parser object for script ChEMBL.py."""
    parser = argparse.ArgumentParser(description=__doc__)

    parser.add_argument(
        '--chembl_evidence',
        help='Input gzipped JSON with the evidence submitted by ChEMBL',
        type=str,
        required=True,
    )
    parser.add_argument(
        '--predictions',
        help='Input TSV containing the categories of the clinical trial reason to stop. Instructions for applying the ML model here: https://github.com/ireneisdoomed/stopReasons.',
        type=str,
        required=True,
    )
    parser.add_argument(
        '--output', help='Output gzipped json file following the target safety liabilities data model.',
        type=str,
        required=True
    )
    parser.add_argument(
        '--log_file',
        help='Destination of the logs generated by this script. Defaults to None',
        type=str,
        nargs='?',
        default=None
    )

    return parser


if __name__ == '__main__':
    args = get_parser().parse_args()

    # Logger initializer. If no log_file is specified, logs are written to stderr
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S',
    )
    if args.log_file:
        logging.config.fileConfig(filename=args.log_file)
    else:
        logging.StreamHandler(sys.stderr)

    global spark
    spark = initialize_sparksession()

    main(
        chembl_evidence=args.chembl_evidence,
        predictions=args.predictions,
        output_file=args.output
    )
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
import argparse
import logging
from typing import List

import pandas as pd
import pyspark.sql.functions as f
import pyspark.sql.types as t
from pyspark.sql import DataFrame

from common.evidence import initialize_sparksession, write_evidence_strings

PROBES_SETS = [
    "Bromodomains chemical toolbox",
    "Chemical Probes for Understudied Kinases",
    "Chemical Probes.org",
    "Gray Laboratory Probes",
    "High-quality chemical probes",
    "MLP Probes",
    "Nature Chemical Biology Probes",
    "Open Science Probes",
    "opnMe Portal",
    "Probe Miner (suitable probes)",
    "Protein methyltransferases chemical toolbox",
    "SGC Probes",
    "Tool Compound Set",
    "Concise Guide to Pharmacology 2019/20",
    "Kinase Chemogenomic Set (KCGS)",
    "Kinase Inhibitors (best-in-class)",
    "Novartis Chemogenetic Library (NIBR MoA Box)",
    "Nuisance compounds in cellular assays",
]


def collapse_cols_data_in_array(
    df: DataFrame, source_cols: List[str], destination_col: str
) -> DataFrame:
    """Collapses the data in a single column when the information is one-hot encoded.

    Args:
        df (DataFrame): Dataframe containing the data for the different probes
        source_cols (List[str]): List of columns containing the data to be collapsed
        destination_col (str): Name of the column where the array will be stored

    Returns:
        DataFrame: Dataframe with a new column containing the sources that have data for a specific probe

    Examples:
        >>> probes_df = spark.createDataFrame([("A", 1, 0), ("B", 1, 1)], ["probe", "source1", "source2"])
        >>> collapse_cols_data_in_array(df, ["source1", "source2"], "datasource").show()
        +-----+-------+-------+------------------+
        |probe|source1|source2|        datasource|
        +-----+-------+-------+------------------+
        |    A|      1|      0|         [source1]|
        |    B|      1|      1|[source1, source2]|
        +-----+-------+-------+------------------+
        <BLANKLINE>

    """
    # Escape the name of the columns in case they contain spaces
    source_cols = [f"`{e}`" for e in source_cols if " " in e]
    return df.withColumn(
        destination_col,
        f.array([f.when(df[c] == 1, c.replace(r"`", "")) for c in source_cols]),
    ).withColumn(
        destination_col, f.array_except(f.col(destination_col), f.array(f.lit(None)))
    )


def clean_origin_col():
    """Removes the substring ' probe' from the origin column to just state if the probe has been reported from an experimental or computational approach."""
    return f.array_distinct(
        f.expr("transform(origin, x -> trim(regexp_replace(x, ' probe', '')))")
    )


def extract_hq_flag():
    """Returns a flag indicating if the probe is high-quality or not."""
    return f.when(
        f.array_contains(f.col("datasourceIds"), "High-quality chemical probes"),
        f.lit(True),
    ).otherwise(f.lit(False))


def convert_stringified_array_to_array(col_name: str) -> DataFrame:
    """Converts a column of stringified arrays to an array column.

    Args:
        col_name: Name of the column that contains the stringified array

    Examples:
        >>> df = spark.createDataFrame([("['BI-1829', 'No control']")], t.StringType())
        >>> df.select(convert_stringified_array_to_array("value").alias("res")).show(truncate=False)
        +---------------------+
        |res                  |
        +---------------------+
        |[BI-1829, No control]|
        +---------------------+
        <BLANKLINE>

    """
    return f.split(f.translate(col_name, "[]'", ""), ", ").cast(
        t.ArrayType(t.StringType())
    )


def replace_dash(col_name):
    """Converts to null those values that only contain `-`."""
    return f.when(f.col(col_name).cast(t.StringType()) != "-", f.col(col_name))


def process_scores(col_name):
    """Helper function to refactor the score processing logic."""
    return (
        f.when(f.col(col_name).contains("-"), replace_dash(col_name))
        .when(f.col(col_name) == 0, f.lit(None))
        .otherwise(f.col(col_name))
    ).cast(t.IntegerType())


def process_probes_data(probes_excel: str) -> List[DataFrame]:
    """Metadata about the compound and the scores given by the different sources."""
    return (
        spark.createDataFrame(
            pd.read_excel(
                probes_excel,
                sheet_name="PROBES",
                header=0,
                index_col=0,
            )
            # Probes that do not have an associated target are marked as nulls
            .query("target.notnull()")
            .reset_index()
            .drop("control_smiles", axis=1)
        )
        # Collect list of datasources for each probe
        .transform(
            lambda df: collapse_cols_data_in_array(df, PROBES_SETS, "datasourceIds")
        )
        # Collecting the list of detection methods of the probe
        .transform(
            lambda df: collapse_cols_data_in_array(
                df,
                ["experimental probe", "calculated probe"],
                "origin",
            )
        ).select(
            "pdid",
            f.col("compound_name").alias("id"),
            clean_origin_col().alias("origin"),
            # Flag the high-quality probes and remove this from the list of datasources
            extract_hq_flag().alias("isHighQuality"),
            f.explode(
                f.array_except(
                    f.col("datasourceIds"),
                    f.array(f.lit("High-quality chemical probes")),
                )
            ).alias("datasourceId"),
            replace_dash("control_name").alias("control"),
        )
    )


def process_probes_targets_data(probes_excel: str) -> DataFrame:
    """Collection of targets associated with the probes and their scores."""
    return (
        spark.createDataFrame(
            pd.read_excel(
                probes_excel, sheet_name="PROBES TARGETS", header=0, index_col=0
            )
            # Probes that do not have an associated target are marked with "-"
            .query("gene_name != '-'")
            .reset_index()
            .drop("control_smiles", axis=1)
        )
        .filter(f.col("organism") == "Homo sapiens")
        .withColumn(
            "mechanismOfAction",
            f.when(
                f.col("action") != "-",
                f.split(f.col("action"), ";"),
            ),
        )
        .select(
            "pdid",
            f.col("target").alias("targetFromSource"),
            "mechanismOfAction",
            process_scores("`P&D probe-likeness score`").alias("probesDrugsScore"),
            process_scores("`Probe Miner Score`").alias("probeMinerScore"),
            process_scores("`Cells score (Chemical Probes.org)`").alias("scoreInCells"),
            process_scores("`Organisms score (Chemical Probes.org)`").alias(
                "scoreInOrganisms"
            ),
        )
    )


def process_probes_sets_data(probes_excel: str) -> DataFrame:
    """Metadata about the different sources of probes."""
    return (
        spark.createDataFrame(
            pd.read_excel(
                probes_excel, sheet_name="COMPOUNDSETS", header=0, index_col=0
            )
        )
        .selectExpr("COMPOUNDSET as datasourceId", "SOURCE_URL as url")
        .filter(f.col("url").startswith("http"))
    )


def process_targets_xrefs(probes_excel: str) -> DataFrame:
    """Look-up table between the gene symbols and the UniProt IDs."""
    return spark.createDataFrame(
        pd.read_excel(
            probes_excel, sheet_name="TARGETS", header=0, index_col=0
        ).reset_index()
    ).selectExpr("target as targetFromSource", "uniprot as targetFromSourceId")


def process_drugs_xrefs(drugs_xrefs: str) -> DataFrame:
    """Look-up table between the probes IDs in P&Ds and ChEMBL."""
    return (
        spark.read.csv(drugs_xrefs, header=True)
        .selectExpr("pdid", "ChEMBL as drugId")
        .filter(f.col("drugId").isNotNull())
    )


def main(probes_excel: str, drugs_xrefs: str) -> DataFrame:
    """Main logic of the script."""

    probes_df = process_probes_data(probes_excel)
    probes_targets_df = process_probes_targets_data(probes_excel)
    probes_sets_df = process_probes_sets_data(probes_excel)
    targets_xref_df = process_targets_xrefs(probes_excel)
    drugs_xref_df = process_drugs_xrefs(drugs_xrefs)

    grouping_cols = [
        "targetFromSourceId",
        "id",
        "drugId",
        "mechanismOfAction",
        "origin",
        "control",
        "isHighQuality",
        "probesDrugsScore",
        "probeMinerScore",
        "scoreInCells",
        "scoreInOrganisms",
    ]

    return (
        probes_targets_df.join(probes_df, on="pdid", how="left")
        .join(targets_xref_df, on="targetFromSource", how="left")
        .join(probes_sets_df, on="datasourceId", how="left")
        .join(drugs_xref_df, on="pdid", how="left")
        .groupBy(grouping_cols)
        .agg(
            f.collect_set(
                f.struct(
                    f.col("datasourceId").alias("niceName"), f.col("url").alias("url")
                )
            ).alias("urls")
        )
    )


def get_parser():
    """Get parser object for script ChemicalProbes.py."""
    parser = argparse.ArgumentParser(description=__doc__)

    parser.add_argument(
        "--probes_excel_path",
        help="Path to the Probes&Drugs Probes XLSX that contains the main tables.",
        type=str,
        required=True,
    )
    parser.add_argument(
        "--probes_mappings_path",
        help="Path to the Probes&Drugs Compounds ID mapping standardised CSV that contains mappings between probes and ChEMBLIDs.",
        type=str,
        required=True,
    )
    parser.add_argument(
        "--output",
        help="Output gzipped json file following the chemical probes data model.",
        type=str,
        required=True,
    )

    return parser


if __name__ == "__main__":
    args = get_parser().parse_args()

    logging.basicConfig(
        level=logging.INFO,
        format="%(name)s - %(levelname)s - %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )

    global spark
    spark = initialize_sparksession()

    out = main(
        probes_excel=args.probes_excel_path, drugs_xrefs=args.probes_mappings_path
    )
    write_evidence_strings(out, args.output)
    logging.info(f"Probes dataset has been saved to {args.output}. Exiting.")
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import argparse
import logging

from pyspark.conf import SparkConf
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import array, col, lit, monotonically_increasing_id, struct, trim
from pyspark.sql.types import StringType, StructType, TimestampType

from common.ontology import add_efo_mapping
from common.evidence import write_evidence_strings


def main(input_file: str, output_file: str, cache_dir: str, local: bool = False) -> None:

    # Initialize spark session
    if local:
        sparkConf = (
            SparkConf()
            .set('spark.driver.memory', '15g')
            .set('spark.executor.memory', '15g')
            .set('spark.driver.maxResultSize', '0')
            .set('spark.debug.maxToStringFields', '2000')
            .set('spark.sql.execution.arrow.maxRecordsPerBatch', '500000')
        )
        spark = (
            SparkSession.builder
            .config(conf=sparkConf)
            .master('local[*]')
            .getOrCreate()
        )
    else:
        sparkConf = (
            SparkConf()
            .set('spark.driver.maxResultSize', '0')
            .set('spark.debug.maxToStringFields', '2000')
            .set('spark.sql.execution.arrow.maxRecordsPerBatch', '500000')
        )
        spark = (
            SparkSession.builder
            .config(conf=sparkConf)
            .getOrCreate()
        )

    # Read and process Clingen's table into evidence strings

    clingen_df = read_input_file(input_file, spark_instance=spark)
    logging.info('Gene Validity Curations table has been imported. Processing evidence strings.')

    evidence_df = process_clingen(clingen_df)

    evidence_df = add_efo_mapping(evidence_strings=evidence_df, spark_instance=spark, ontoma_cache_dir=cache_dir)
    logging.info('Disease mappings have been added.')

    write_evidence_strings(evidence_df, output_file)
    logging.info(f'{evidence_df.count()} evidence strings have been saved to {output_file}')


def read_input_file(input_file: str, spark_instance) -> DataFrame:
    """
    Reads Gene Validity Curations CSV file into a Spark DataFrame forcing the schema
    The first 6 rows of this file include metadata that needs to be dropped
    """

    clingen_schema = (
        StructType()
        .add('GENE SYMBOL', StringType())
        .add('GENE ID (HGNC)', StringType())
        .add('DISEASE LABEL', StringType())
        .add('DISEASE ID (MONDO)', StringType())
        .add('MOI', StringType())
        .add('SOP', StringType())
        .add('CLASSIFICATION', StringType())
        .add('ONLINE REPORT', StringType())
        .add('CLASSIFICATION DATE', TimestampType())
        .add('GCEP', StringType())
    )

    clingen_df = (
        spark_instance.read.csv(input_file, schema=clingen_schema)
        # The first 6 rows of the GVC file include metadata that needs to be dropped
        .withColumn('idx', monotonically_increasing_id())
        .filter(col('idx') > 5)
        .drop('idx')
    )

    return clingen_df


def process_clingen(clingen_df: DataFrame) -> DataFrame:
    """
    The JSON Schema format is applied to the df
    """

    return (
        clingen_df
        .withColumn('datasourceId', lit('clingen'))
        .withColumn('datatypeId', lit('genetic_literature'))
        .withColumn('urls', struct(col('ONLINE REPORT').alias('url')))

        .select(
            'datasourceId', 'datatypeId',
            trim(col('GENE SYMBOL')).alias('targetFromSourceId'),
            col('DISEASE LABEL').alias('diseaseFromSource'),
            col('DISEASE ID (MONDO)').alias('diseaseFromSourceId'),
            array(col('MOI')).alias('allelicRequirements'),
            array(col('urls')).alias('urls'),
            col('CLASSIFICATION').alias('confidence'),
            col('GCEP').alias('studyId')
        )
    )


if __name__ == "__main__":

    # Parse CLI arguments
    parser = argparse.ArgumentParser(description='Parse ClinGen gene-disease associations from Gene Validity Curations')
    parser.add_argument('--input_file',
                        help='Name of csv file downloaded from https://search.clinicalgenome.org/kb/gene-validity',
                        type=str, required=True)
    parser.add_argument('--output_file',
                        help='Absolute path of the gzipped JSON evidence file.',
                        type=str, required=True)
    parser.add_argument('--log_file', type=str,
                        help='Optional filename to redirect the logs into.')
    parser.add_argument('--cache_dir', required=False, help='Directory to store the OnToma cache files in.')
    parser.add_argument(
        '--local', action='store_true', required=False, default=False,
        help='Flag to indicate if the script is executed locally or on the cluster'
    )
    args = parser.parse_args()

    # Initialize logging:
    logging.basicConfig(
        level=logging.INFO,
        format='%(name)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S',
    )
    if args.log_file:
        logging.config.fileConfig(filename=args.log_file)

    # Report input data:
    logging.info(f'Clingen input file path: {args.input_file}')
    logging.info(f'Evidence output file path: {args.output_file}')
    logging.info(f'Cache directory: {args.cache_dir}')

    main(
        input_file=args.input_file, output_file=args.output_file,
        cache_dir=args.cache_dir, local=args.local
    )
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import argparse
import logging
import sys

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import FloatType
from pyspark.sql.functions import col, collect_set, split, element_at, struct, lit

from common.evidence import write_evidence_strings

# A few genes do not have Ensembl IDs in the data file provided
CRISPR_SYMBOL_MAPPING = {
    'CASC5': 'ENSG00000137812',
    'CIRH1A': 'ENSG00000141076',
    'EFTUD1': 'ENSG00000140598',
    'ENSG00000163660': 'ENSG00000163660',
    'KIAA0947': 'ENSG00000164151',
    'KIAA1432': 'ENSG00000107036',
    'NDNL2': 'ENSG00000185115',
    'SRPR': 'ENSG00000182934',
    'ZNF259': 'ENSG00000109917'
}


def main(desc_file, evid_file, cell_file, out_file):
    sparkConf = (
        SparkConf()
        .set('spark.driver.memory', '15g')
        .set('spark.executor.memory', '15g')
        .set('spark.driver.maxResultSize', '0')
        .set('spark.debug.maxToStringFields', '2000')
        .set('spark.sql.execution.arrow.maxRecordsPerBatch', '500000')
    )
    spark = (
        SparkSession.builder
        .config(conf=sparkConf)
        .master('local[*]')
        .getOrCreate()
    )

    # Log parameters:
    logging.info(f'Evidence file: {evid_file}')
    logging.info(f'Description file: {desc_file}')
    logging.info(f'Cell type annotation: {cell_file}')
    logging.info(f'Output file: {out_file}')

    # Read files:
    evidence_df = (
        spark.read.csv(evid_file, sep='\t', header=True)
        .drop('pmid', 'gene_set_name', 'disease_name')
    )
    cell_lines_df = spark.read.csv(cell_file, sep='\t', header=True)
    description_df = spark.read.csv(desc_file, sep='\t', header=True)

    # Logging dataframe stats:
    logging.info(f'Number of evidence: {evidence_df.count()}')
    logging.info(f'Number of descriptions: {description_df.count()}')
    logging.info(f'Number of cell/tissue annotation: {cell_lines_df.count()}')

    # Tissues and cancer types are annotated together in the same column (tissue_or_cancer_type)
    # To disambiguate one from another, the column is combined with the cell lines
    # First on the tissue level:
    tissue_desc = (
        description_df
        .withColumnRenamed('tissue_or_cancer_type', 'tissue')
        .join(cell_lines_df, on='tissue', how='inner')
    )

    # And then on the disease level:
    cell_desc = (
        description_df
        .withColumnRenamed('tissue_or_cancer_type', 'diseaseFromSource')
        .join(cell_lines_df, on='diseaseFromSource', how='inner')
    )

    merged_annotation = (
        # Concatenating the above generated dataframes:
        cell_desc
        .union(tissue_desc)

        # Aggregating by disease and method:
        .groupBy('diseaseFromSource', 'efo_id', 'method')

        # The cell annotation is aggregated in a list of struct:
        .agg(
            collect_set(struct(
                col('name'), col('id'), col('tissue'), col('tissueId')
            )).alias('diseaseCellLines')
        )
        .drop('method')
    )

    # Joining merged annotation with evidence:
    pooled_evidence_df = (
        evidence_df
        .select(
            col('target_id').alias('targetFromSourceId'),
            col('disease_id').alias('efo_id'),
            col('score').alias('resourceScore').cast(FloatType()),
        )

        # Some of the target identifier are not Ensembl Gene id - replace them:
        .replace(to_replace=CRISPR_SYMBOL_MAPPING, subset=['targetFromSourceId'])

        # Merging with descriptions:
        .join(merged_annotation, on='efo_id', how='outer')

        # From EFO uri, generate EFO id:
        .withColumn(
            'diseaseFromSourceMappedId',
            element_at(split(col('efo_id'), '/'), -1).alias('diseaseFromSourceMappedId')
        )
        .drop('efo_id')

        # Adding constants:
        .withColumn('datasourceId', lit('crispr'))
        .withColumn('datatypeId', lit('affected_pathway'))
        .persist()
    )

    logging.info(f'Saving {pooled_evidence_df.count()} CRISPR evidence in JSON format, to: {out_file}')

    write_evidence_strings(pooled_evidence_df, out_file)


if __name__ == "__main__":

    # Parse CLI arguments:
    parser = argparse.ArgumentParser(
        description='Parse essential cancer genes identified using CRISPR assays and prioritised in Project Score'
    )
    parser.add_argument('-d', '--descriptions_file',
                        help='Name of tsv file with the description of the method per cancer type',
                        type=str, required=True)
    parser.add_argument('-e', '--evidence_file',
                        help='Name of tsv file with the priority score',
                        type=str, required=True)
    parser.add_argument('-c', '--cell_types_file',
                        help='Name of tsv file with cell line names per cancer type',
                        type=str, required=True)
    parser.add_argument('-o', '--output_file',
                        help='Name of evidence file. (gzip compressed json)',
                        type=str, required=True)
    parser.add_argument('-l', '--log_file',
                        help='Name of log file. If not provided logs are written to standard error.',
                        type=str, required=False)

    args = parser.parse_args()
    desc_file = args.descriptions_file
    evid_file = args.evidence_file
    cell_file = args.cell_types_file
    out_file = args.output_file

    # If no logfile is specified, logs are written to stderr
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S',
    )
    if args.log_file:
        logging.config.fileConfig(filename=args.log_file)
    else:
        logging.StreamHandler(sys.stderr)

    main(desc_file, evid_file, cell_file, out_file)
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import argparse
from functools import reduce
import logging
import sys
from pyspark.sql import SparkSession, DataFrame

from common.evidence import initialize_sparksession, write_evidence_strings
from BrainCRISPR import CRISPRBrain


def crispr_brain_wrapper(spark: SparkSession, crispr_brain_mapping: str) -> DataFrame:
    """Simple wrapper around CRISPRBrain class to generate and extract evidence."""
    crispr_brain_parser = CRISPRBrain(spark, crispr_brain_mapping)
    crispr_brain_parser.create_crispr_brain_evidence()
    return crispr_brain_parser.get_dataframe()


def main(spark: SparkSession, crispr_brain_mapping: str) -> DataFrame:
    crispr_screen_evidence_sets = [
        # Generate evidence from CRISPR Brain:
        crispr_brain_wrapper(spark, crispr_brain_mapping).persist(),
        # Further datasets can be added here:
    ]

    # Combining all sources into one single dataframe:
    combined_evidence = reduce(
        lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True),
        crispr_screen_evidence_sets,
    )

    logging.info(f"Total number of CRISPR screen evidence: {combined_evidence.count()}")
    return combined_evidence


def get_parser():
    """Parsing command line argument for crispr screen ingestion."""
    parser = argparse.ArgumentParser(description=__doc__)

    parser.add_argument(
        "--crispr_brain_mapping",
        help="Curation for CRISPR Brain dataset containing disease mapping and contrast.",
        type=str,
        required=True,
    )
    parser.add_argument(
        "--output",
        help="Output gzipped json file.",
        type=str,
        required=True,
    )
    parser.add_argument(
        "--log_file",
        help="Destination of the logs generated by this script. Defaults to None",
        type=str,
        nargs="?",
        default=None,
    )
    return parser


if __name__ == "__main__":
    args = get_parser().parse_args()

    # Logger initializer. If no log_file is specified, logs are written to stderr
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
    if args.log_file:
        logging.FileHandler(args.log_file)
    else:
        logging.StreamHandler(sys.stderr)

    spark = initialize_sparksession()

    evidence_df = main(
        spark,
        args.crispr_brain_mapping,
    )

    write_evidence_strings(evidence_df, args.output)
    logging.info(f"Evidence strings have been saved to {args.output}. Exiting.")
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
import argparse
import logging
import sys

from pyspark.conf import SparkConf
from pyspark.sql import DataFrame, SparkSession, functions as f, types as t

from common.ontology import add_efo_mapping
from common.evidence import initialize_sparksession, write_evidence_strings


G2P_mutationCsq2functionalCsq = {
    "uncertain": "SO_0002220",  # function_uncertain_variant
    "absent gene product": "SO_0002317",  # absent_gene_product
    "altered gene product structure": "SO_0002318",  # altered_gene_product_structure
    "5_prime or 3_prime UTR mutation": "SO_0001622",  # UTR_variant
    "increased gene product level": "SO_0002315",  # increased_gene_product_level
    "cis-regulatory or promotor mutation": "SO_0001566",  # regulatory_region_variant
}


def main(
    dd_file: str,
    eye_file: str,
    skin_file: str,
    cancer_file: str,
    cardiac_file: str,
    output_file: str,
    cache_dir: str,
    local: bool = False,
) -> None:
    # Initialize spark session
    spark = initialize_sparksession()

    # Read and process G2P's tables into evidence strings
    gene2phenotype_df = read_input_files(
        spark, dd_file, eye_file, skin_file, cancer_file, cardiac_file
    )
    logging.info(
        "Gene2Phenotype panels have been imported. Processing evidence strings."
    )

    evidence_df = process_gene2phenotype(gene2phenotype_df)

    evidence_df = add_efo_mapping(
        evidence_strings=evidence_df, ontoma_cache_dir=cache_dir, spark_instance=spark
    )
    logging.info("Disease mappings have been added.")

    # Saving data:
    write_evidence_strings(evidence_df, output_file)
    logging.info(
        f"{evidence_df.count()} evidence strings have been saved to {output_file}"
    )


def read_input_files(
    spark: SparkSession,
    dd_file: str,
    eye_file: str,
    skin_file: str,
    cancer_file: str,
    cardiac_file: str,
) -> DataFrame:
    """
    Reads G2P's panel CSV files into a Spark DataFrame forcing the schema
    """

    gene2phenotype_schema = (
        t.StructType()
        .add("gene symbol", t.StringType())
        .add("gene mim", t.IntegerType())
        .add("disease name", t.StringType())
        .add("disease mim", t.StringType())
        .add("confidence category", t.StringType())
        .add("allelic requirement", t.StringType())
        .add("mutation consequence", t.StringType())
        .add("phenotypes", t.StringType())
        .add("organ specificity list", t.StringType())
        .add("pmids", t.StringType())
        .add("panel", t.StringType())
        .add("prev symbols", t.StringType())
        .add("hgnc id", t.IntegerType())
        .add("gene disease pair entry date", t.TimestampType())
        .add("cross cutting modifier", t.StringType())
        .add("mutation consequence flag", t.StringType())
        .add("confidence value flag", t.StringType())
        .add("comments", t.StringType())
        .add("variant consequence", t.StringType())
        .add("disease ontology", t.StringType())
    )

    return (
        spark.read.option("multiLine", True)
        .option("encoding", "UTF-8")
        .csv(
            [dd_file, eye_file, skin_file, cancer_file, cardiac_file],
            schema=gene2phenotype_schema,
            enforceSchema=True,
            header=True,
            sep=",",
            quote='"',
        )
        # Dropping one incomplete evidence due to parsing problem:
        # All data: 4094
        # Filtered data: 4093
        .filter(f.col("gene symbol").isNotNull() & f.col("panel").isNotNull())
    )


def process_gene2phenotype(gene2phenotype_df: DataFrame) -> DataFrame:
    """
    The JSON Schema format is applied to the df
    """

    return gene2phenotype_df.select(
        # Split pubmed IDs to list:
        f.split(f.col("pmids"), ";").alias("literature"),
        # Phenotypes are excluded from the schema:
        # f.split(f.col("phenotypes"), ";").alias("phenotypes"),
        # Renaming a few columns:
        f.col("gene symbol").alias("targetFromSourceId"),
        f.col("panel").alias("studyId"),
        f.col("confidence category").alias("confidence"),
        # Parsing allelic requirements:
        f.when(
            f.col("allelic requirement").isNotNull(),
            f.array(f.col("allelic requirement")),
        ).alias("allelicRequirements"),
        # Parsing disease from source identifier:
        f.when(f.col("disease ontology").isNotNull(), f.col("disease ontology"))
        .when(
            (~f.col("disease mim").contains("No disease mim"))
            & (f.col("disease mim").isNotNull()),
            f.concat(f.lit("OMIM:"), f.col("disease mim")),
        )
        .otherwise(f.lit(None))
        .alias("diseaseFromSourceId"),
        # Cleaning disease names:
        f.regexp_replace(f.col("disease name"), r".+-related ", "").alias(
            "diseaseFromSource"
        ),
        # Map functional consequences:
        translate(G2P_mutationCsq2functionalCsq)("mutation consequence").alias(
            "variantFunctionalConsequenceId"
        ),
        # Adding constant columns:
        f.lit("gene2phenotype").alias("datasourceId"),
        f.lit("genetic_literature").alias("datatypeId"),
    )


def translate(mapping):
    """
    Mapping consequences - to SO codes
    """

    def translate_(col):
        return mapping.get(col)

    return f.udf(translate_, t.StringType())


if __name__ == "__main__":
    # Parse CLI arguments
    parser = argparse.ArgumentParser(
        description="Parse Gene2Phenotype gene-disease files downloaded from https://www.ebi.ac.uk/gene2phenotype/downloads/"
    )
    parser.add_argument(
        "-d",
        "--dd_panel",
        help="DD panel file downloaded from https://www.ebi.ac.uk/gene2phenotype/downloads",
        required=True,
        type=str,
    )
    parser.add_argument(
        "-e",
        "--eye_panel",
        help="Eye panel file downloaded from https://www.ebi.ac.uk/gene2phenotype/downloads",
        required=True,
        type=str,
    )
    parser.add_argument(
        "-s",
        "--skin_panel",
        help="Skin panel file downloaded from https://www.ebi.ac.uk/gene2phenotype/downloads",
        required=True,
        type=str,
    )
    parser.add_argument(
        "-c",
        "--cancer_panel",
        help="Cancer panel file downloaded from https://www.ebi.ac.uk/gene2phenotype/downloads",
        required=True,
        type=str,
    )
    parser.add_argument(
        "-cr",
        "--cardiac_panel",
        help="Cardiac panel file downloaded from https://www.ebi.ac.uk/gene2phenotype/downloads",
        required=True,
        type=str,
    )
    parser.add_argument(
        "-o",
        "--output_file",
        help="Absolute path of the gzipped, JSON evidence file.",
        required=True,
        type=str,
    )
    parser.add_argument(
        "-l", "--log_file", help="Filename to store the parser logs.", type=str
    )
    parser.add_argument(
        "--cache_dir",
        required=False,
        help="Directory to store the OnToma cache files in.",
    )
    parser.add_argument(
        "--local",
        action="store_true",
        required=False,
        default=False,
        help="Flag to indicate if the script is executed locally or on the cluster",
    )

    args = parser.parse_args()

    # Get parameters
    dd_file = args.dd_panel
    eye_file = args.eye_panel
    skin_file = args.skin_panel
    cancer_file = args.cancer_panel
    cardiac_file = args.cardiac_panel
    output_file = args.output_file
    log_file = args.log_file
    cache_dir = args.cache_dir
    local = args.local

    # Configure logger:
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
    if log_file:
        logging.config.fileConfig(fname=log_file)
    else:
        logging.StreamHandler(sys.stderr)

    # Report input data:
    logging.info(f"DD panel file: {dd_file}")
    logging.info(f"Eye panel file: {eye_file}")
    logging.info(f"Skin panel file: {skin_file}")
    logging.info(f"Cancer panel file: {cancer_file}")
    logging.info(f"Cardiac panel file: {cardiac_file}")

    # Calling main:
    main(
        dd_file, eye_file, skin_file, cancer_file, cardiac_file, output_file, cache_dir
    )
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
import argparse
from functools import partial, reduce
import logging
import sys

from pyspark import SparkFiles
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
import pyspark.sql.functions as F
import pyspark.sql.types as T

from common.evidence import initialize_sparksession, write_evidence_strings
from AzGeneBurden import main as process_az_gene_burden
from GenebassGeneBurden import main as process_genebass_gene_burden


def main(
    az_binary_data: str,
    az_quant_data: str,
    curated_data: str,
    genebass_data: str,
) -> DataFrame:
    """This module brings together and exports target/disease evidence generated by AzGeneBurden.py and RegeneronGeneBurdeb.py."""

    burden_evidence_sets = [
        # Generate evidence from AZ data:
        process_az_gene_burden(az_binary_data, az_quant_data).persist(),
        # Generate evidence from manual data:
        process_gene_burden_curation(curated_data).persist(),
        # Generate evidence from GeneBass data:
        process_genebass_gene_burden(genebass_data).persist(),
    ]

    unionByDiffSchema = partial(DataFrame.unionByName, allowMissingColumns=True)
    evd_df = reduce(unionByDiffSchema, burden_evidence_sets).distinct()
    logging.info(f'Total number of gene_burden evidence: {evd_df.count()}')

    return evd_df


def process_gene_burden_curation(curated_data: str) -> DataFrame:
    """Process manual gene burden evidence."""

    logging.info(f'File with the curated burden associations: {curated_data}')
    manual_df = read_gene_burden_curation(curated_data)
    logging.info(f'Total number of imported manual gene_burden evidence: {manual_df.count()}')

    manual_df = (
        manual_df
        # The columns practically follow the schema, only small things need to be parsed
        # 1. Confidence intervals are detached
        .withColumn(
            'oddsRatioConfidenceIntervalLower', F.when(F.col('oddsRatio').isNotNull(), F.col('ConfidenceIntervalLower'))
        )
        .withColumn(
            'oddsRatioConfidenceIntervalUpper', F.when(F.col('oddsRatio').isNotNull(), F.col('ConfidenceIntervalUpper'))
        )
        .withColumn('betaConfidenceIntervalLower', F.when(F.col('beta').isNotNull(), F.col('ConfidenceIntervalLower')))
        .withColumn('betaConfidenceIntervalUpper', F.when(F.col('beta').isNotNull(), F.col('ConfidenceIntervalUpper')))
        .drop('ConfidenceIntervalLower', 'ConfidenceIntervalUpper')
        # 2. Collect PMID and allelic requirements in an array
        .withColumn('literature', F.array(F.col('literature')))
        .withColumn(
            'allelicRequirements',
            F.when(F.col('allelicRequirements').isNotNull(), F.array(F.col('allelicRequirements'))),
        )
        # 3. Split the sex column to form an array
        .withColumn('sex', F.split(F.col('sex'), ', '))
        # 4. Add hardcoded values and drop URLs (will be handled by the FE) and HGNC symbols
        .withColumn('datasourceId', F.lit('gene_burden'))
        .withColumn('datatypeId', F.lit('genetic_association'))
        .drop('url', 'targetFromSource')
        .distinct()
    )

    return manual_df


def read_gene_burden_curation(curated_data: str) -> DataFrame:
    """Read manual gene burden curation from remote to a Spark DataFrame."""

    schema = T.StructType(
        [
            T.StructField('projectId', T.StringType(), True),
            T.StructField('targetFromSource', T.StringType(), True),
            T.StructField('targetFromSourceId', T.StringType(), True),
            T.StructField('diseaseFromSource', T.StringType(), True),
            T.StructField('diseaseFromSourceMappedId', T.StringType(), True),
            T.StructField('resourceScore', T.DoubleType(), True),
            T.StructField('pValueMantissa', T.DoubleType(), True),
            T.StructField('pValueExponent', T.IntegerType(), True),
            T.StructField('oddsRatio', T.DoubleType(), True),
            T.StructField('ConfidenceIntervalLower', T.DoubleType(), True),
            T.StructField('ConfidenceIntervalUpper', T.DoubleType(), True),
            T.StructField('beta', T.DoubleType(), True),
            T.StructField('sex', T.StringType(), True),
            T.StructField('ancestry', T.StringType(), True),
            T.StructField('ancestryId', T.StringType(), True),
            T.StructField('cohortId', T.StringType(), True),
            T.StructField('studySampleSize', T.IntegerType(), True),
            T.StructField('studyCases', T.IntegerType(), True),
            T.StructField('studyCasesWithQualifyingVariants', T.IntegerType(), True),
            T.StructField('allelicRequirements', T.StringType(), True),
            T.StructField('studyId', T.StringType(), True),
            T.StructField('statisticalMethod', T.StringType(), True),
            T.StructField('statisticalMethodOverview', T.StringType(), True),
            T.StructField('literature', T.StringType(), True),
            T.StructField('url', T.StringType(), True),
        ]
    )

    spark.sparkContext.addFile(curated_data)

    return SparkSession.getActiveSession().read.csv(
        SparkFiles.get(curated_data.split('/')[-1]), sep='\t', header=True, schema=schema
    )


def get_parser():
    """Get parser object for script GeneBurden.py."""
    parser = argparse.ArgumentParser(description=__doc__)

    parser.add_argument(
        '--az_binary_data',
        help='Input parquet files with AZ\'s PheWAS associations of binary traits.',
        type=str,
        required=True,
    )
    parser.add_argument(
        '--az_quant_data',
        help='Input parquet files with AZ\'s PheWAS associations of quantitative traits.',
        type=str,
        required=True,
    )
    parser.add_argument(
        '--curated_data',
        help='Input remote CSV file containing the gene burden manual curation.',
        type=str,
        required=True,
    )
    parser.add_argument(
        '--genebass_data',
        help='Input parquet files with Genebass\'s burden associations.',
        type=str,
        required=True,
    )
    parser.add_argument(
        '--output',
        help='Output gzipped json file following the gene_burden evidence data model.',
        type=str,
        required=True,
    )
    parser.add_argument(
        '--log_file',
        help='Destination of the logs generated by this script. Defaults to None',
        type=str,
        nargs='?',
        default=None,
    )

    return parser


if __name__ == "__main__":
    args = get_parser().parse_args()

    # Logger initializer. If no log_file is specified, logs are written to stderr
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S',
    )
    if args.log_file:
        logging.config.fileConfig(filename=args.log_file)
    else:
        logging.StreamHandler(sys.stderr)

    spark = initialize_sparksession()

    evd_df = main(
        az_binary_data=args.az_binary_data,
        az_quant_data=args.az_quant_data,
        curated_data=args.curated_data,
        genebass_data=args.genebass_data,
    )

    write_evidence_strings(evd_df, args.output)
    logging.info(f'Evidence strings have been saved to {args.output}. Exiting.')
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
import argparse
import logging
import os
import pathlib
import shutil
import tempfile
import urllib.request

import pronto
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import Window
import pyspark.sql.functions as pf
from pyspark.sql.types import StructType, StructField, StringType
import requests
from retry import retry

from common.ontology import add_efo_mapping
from common.evidence import detect_spark_memory_limit, write_evidence_strings


# The tables and their fields to fetch from SOLR. Other tables (not currently used): gene, disease_gene_summary.
IMPC_SOLR_TABLES = {
    # Mouse to human mappings.
    'gene_gene': ('gene_id', 'hgnc_gene_id'),
    'ontology_ontology': ('mp_id', 'hp_id'),
    # Mouse model and disease data.
    'mouse_model': ('model_id', 'model_phenotypes'),
    'disease': ('disease_id', 'disease_phenotypes'),
    'disease_model_summary': ('model_id', 'model_genetic_background', 'model_description', 'disease_id', 'disease_term',
                              'disease_model_avg_norm', 'disease_model_max_norm', 'marker_id'),
    'ontology': ('ontology', 'phenotype_id', 'phenotype_term'),
}


# List of fields on which to enforce uniqueness by only keeping the record with the highest score.
UNIQUE_FIELDS = [
    # Specific to IMPC.
    'diseaseFromSource',  # Original disease name.
    'targetInModel',  # Mouse gene name.
    'biologicalModelAllelicComposition',  # Mouse model property.
    'biologicalModelGeneticBackground',  # Mouse model property.

    # General.
    'diseaseFromSourceMappedId',  # EFO mapped disease ID.
    'targetFromSourceId',  # Ensembl mapped human gene ID.
]

class ImpcSolrRetriever:
    """Retrieve data from the IMPC SOLR API and save the CSV files to the specified location."""

    # Mouse model data from IMPC SOLR.
    IMPC_SOLR_HOST = 'http://www.ebi.ac.uk/mi/impc/solr/phenodigm/select'

    # The largest table is about 7 million records. The one billion limit is used as an arbitrary high number to
    # retrieve all records in one large request, which maximises the performance.
    IMPC_SOLR_BATCH_SIZE = 1000000000
    IMPC_SOLR_TIMEOUT = 3600

    # The decorator ensures that the requests are retried in case of network or server errors.
    @retry(tries=3, delay=5, backoff=1.2, jitter=(1, 3))
    def get_number_of_solr_records(self, data_type):
        params = {'q': '*:*', 'fq': f'type:{data_type}', 'rows': 0}
        response = requests.get(self.IMPC_SOLR_HOST, params=params, timeout=self.IMPC_SOLR_TIMEOUT)
        response.raise_for_status()  # Check for HTTP errors. This will be caught by @retry.
        return response.json()['response']['numFound']

    @retry(tries=3, delay=5, backoff=1.2, jitter=(1, 3))
    def query_solr(self, data_type, start):
        """Request one batch of SOLR records of the specified data type and write it into a temporary file."""
        list_of_columns = [column.split(' > ')[0] for column in IMPC_SOLR_TABLES[data_type]]
        params = {'q': '*:*', 'fq': f'type:{data_type}', 'start': start, 'rows': self.IMPC_SOLR_BATCH_SIZE, 'wt': 'csv',
                  'fl': ','.join(list_of_columns)}
        response = requests.get(self.IMPC_SOLR_HOST, params=params, timeout=self.IMPC_SOLR_TIMEOUT, stream=True)
        response.raise_for_status()
        # Write records as they appear to avoid keeping the entire response in memory.
        with tempfile.NamedTemporaryFile('wt', delete=False) as tmp_file:
            response_lines = response.iter_lines(decode_unicode=True)
            header = next(response_lines)
            if start == 0:  # Only write the header for the first requested batch.
                tmp_file.write(header + '\n')
            number_of_records = 0
            for line in response_lines:
                number_of_records += 1
                tmp_file.write(line + '\n')
            return number_of_records, tmp_file.name

    def fetch_data(self, data_type, output_filename):
        """Fetch all rows of the requested data type to the specified location."""
        total_records = self.get_number_of_solr_records(data_type)
        assert total_records != 0, f'SOLR did not return any data for {data_type}.'
        with open(output_filename, 'wb') as outfile:
            start, total = 0, 0  # Initialise the counters.
            while True:
                number_of_records, tmp_filename = self.query_solr(data_type, start)
                with open(tmp_filename, 'rb') as tmp_file:
                    shutil.copyfileobj(tmp_file, outfile)
                os.remove(tmp_filename)
                # Increment the counters.
                start += self.IMPC_SOLR_BATCH_SIZE
                total += number_of_records
                # Exit when all documents have been retrieved.
                if total == total_records:
                    break


class IMPC:
    """Retrieve the data, load it into Spark, process and write the resulting evidence strings."""

    # Human gene mappings.
    HGNC_DATASET_URI = 'http://ftp.ebi.ac.uk/pub/databases/genenames/hgnc/tsv/hgnc_complete_set.txt'
    HGNC_DATASET_FILENAME = 'hgnc_complete_set.txt'

    # Mouse gene mappings.
    MGI_DATASET_URI = 'http://www.informatics.jax.org/downloads/reports/MGI_Gene_Model_Coord.rpt'
    MGI_DATASET_FILENAME = 'MGI_Gene_Model_Coord.rpt'

    # Mouse PubMed references.
    MGI_PUBMED_URI = 'http://www.informatics.jax.org/downloads/reports/MGI_PhenoGenoMP.rpt'
    MGI_PUBMED_FILENAME = 'MGI_PhenoGenoMP.rpt'

    # Mammalian Phenotype ontology.
    MGI_MP_URI = 'http://www.informatics.jax.org/downloads/reports/mp.owl'
    MGI_MP_FILENAME = 'MGI_mp.owl'

    IMPC_FILENAME = 'impc_solr_{data_type}.csv'

    def __init__(self, logger, cache_dir, local):
        self.logger = logger
        self.cache_dir = cache_dir
        self.gene_mapping, self.literature, self.mouse_phenotype_to_human_phenotype = [None] * 3
        self.model_mouse_phenotypes, self.disease_human_phenotypes, self.disease_model_summary = [None] * 3
        self.ontology, self.mp_terms, self.hp_terms, self.mp_class = [None] * 4
        self.evidence, self.mouse_phenotypes = [None] * 2

        # Initialize spark session
        spark_mem_limit = detect_spark_memory_limit()
        spark_conf = (
            SparkConf()
            .set('spark.driver.memory', f'{spark_mem_limit}g')
            .set('spark.executor.memory', f'{spark_mem_limit}g')
            .set('spark.driver.maxResultSize', '0')
            .set('spark.debug.maxToStringFields', '2000')
            .set('spark.sql.execution.arrow.maxRecordsPerBatch', '500000')
        )
        self.spark = (
            SparkSession.builder
            .config(conf=spark_conf)
            .master('local[*]')
            .getOrCreate()
        )

    def update_cache(self):
        """Fetch the Ensembl gene ID and SOLR data into the local cache directory."""
        pathlib.Path(self.cache_dir).mkdir(parents=False, exist_ok=True)

        self.logger.info('Fetching human gene ID mappings from HGNC.')
        urllib.request.urlretrieve(self.HGNC_DATASET_URI, os.path.join(self.cache_dir, self.HGNC_DATASET_FILENAME))

        self.logger.info('Fetching mouse gene ID mappings from MGI.')
        urllib.request.urlretrieve(self.MGI_DATASET_URI, os.path.join(self.cache_dir, self.MGI_DATASET_FILENAME))

        self.logger.info('Fetching mouse PubMed references from MGI.')
        urllib.request.urlretrieve(self.MGI_PUBMED_URI, os.path.join(self.cache_dir, self.MGI_PUBMED_FILENAME))

        self.logger.info('Fetching Mammalian Phenotype ontology definitions from MGI.')
        urllib.request.urlretrieve(self.MGI_MP_URI, os.path.join(self.cache_dir, self.MGI_MP_FILENAME))

        self.logger.info('Fetching data from IMPC SOLR.')
        impc_solr_retriever = ImpcSolrRetriever()
        for data_type in IMPC_SOLR_TABLES:
            self.logger.info(f'Fetching data type {data_type}.')
            filename = os.path.join(self.cache_dir, self.IMPC_FILENAME.format(data_type=data_type))
            impc_solr_retriever.fetch_data(data_type, filename)

    def load_tsv(self, filename, column_names=None):
        if column_names:
            schema = StructType([
                StructField(column_name, StringType(), True)
                for column_name in column_names
            ])
            header = False
        else:
            schema = None
            header = True
        return self.spark.read.csv(os.path.join(self.cache_dir, filename), sep='\t', header=header, schema=schema,
                                   nullValue='null')

    def load_solr_csv(self, data_type):
        """Load the CSV from SOLR; rename and select columns as specified."""
        df = self.spark.read.csv(
            os.path.join(self.cache_dir, self.IMPC_FILENAME.format(data_type=data_type)),
            header=True
        )
        column_name_mappings = [column_map.split(' > ') for column_map in IMPC_SOLR_TABLES[data_type]]
        columns_to_rename = {mapping[0]: mapping[1] for mapping in column_name_mappings if len(mapping) == 2}
        new_column_names = [mapping[-1] for mapping in column_name_mappings]
        # Rename columns.
        for old_column_name, new_column_name in columns_to_rename.items():
            df = df.withColumnRenamed(old_column_name, new_column_name)
        # Restrict only to the columns we need.
        return df.select(new_column_names)

    def load_data_from_cache(self):
        """Load SOLR and MRI data from the downloaded TSV/CSV files into Spark and prepare for processing."""
        # MGI gene ID → MGI gene name, Ensembl mouse gene ID.
        mgi_gene_id_to_ensembl_mouse_gene_id = (
            self.load_tsv(self.MGI_DATASET_FILENAME)
            .withColumnRenamed('1. MGI accession id', 'targetInModelMgiId')
            .withColumnRenamed('3. marker symbol', 'targetInModel')
            .withColumnRenamed('11. Ensembl gene id', 'targetInModelEnsemblId')
            .filter(pf.col('targetInModelMgiId').isNotNull())
            .select('targetInModelMgiId', 'targetInModel', 'targetInModelEnsemblId')
            # E.g. 'MGI:87859', 'Abl1', 'ENSMUSG00000026842'.
        )
        # MGI gene ID → HGNC ID.
        mouse_gene_to_human_gene = (
            self.load_solr_csv('gene_gene')
            .withColumnRenamed('gene_id', 'targetInModelMgiId')
            .select('targetInModelMgiId', 'hgnc_gene_id')
            # E.g. 'MGI:1346074', 'HGNC:4024'.
        )
        # HGNC ID → Ensembl human gene ID.
        hgnc_gene_id_to_ensembl_human_gene_id = (
            self.load_tsv(self.HGNC_DATASET_FILENAME)
            .withColumnRenamed('hgnc_id', 'hgnc_gene_id')
            .withColumnRenamed('ensembl_gene_id', 'targetFromSourceId')
            .select('hgnc_gene_id', 'targetFromSourceId')
            # E.g. 'HGNC:5', 'ENSG00000121410'.
        )
        # Using the three datasets above, we construct the complete gene mapping from MGI gene ID (the only type of
        # identifier used in the source data) to gene name, mouse Ensembl ID and human Ensembl ID. In cases where
        # mappings are not one to one, joins will handle the necessary explosions.
        self.gene_mapping = (
            mgi_gene_id_to_ensembl_mouse_gene_id
            .join(mouse_gene_to_human_gene, on='targetInModelMgiId', how='inner')
            .join(hgnc_gene_id_to_ensembl_human_gene_id, on='hgnc_gene_id', how='inner')
            # For both the evidence and mousePhenotypes datasets, entries without human gene mapping are unusable.
            .filter(pf.col('targetFromSourceId').isNotNull())
            .select('targetInModelMgiId', 'targetInModel', 'targetInModelEnsemblId', 'targetFromSourceId')
            # E.g. 'MGI:87859', 'Abl1', 'ENSMUSG00000026842', 'ENSG00000121410'.
        )

        # Mouse to human phenotype mappings.
        self.mouse_phenotype_to_human_phenotype = (
            self.load_solr_csv('ontology_ontology')
            .select('mp_id', 'hp_id')
            # E.g. 'MP:0000745', 'HP:0100033'.
        )

        # Mouse model and disease data. On loading, we split lists of phenotypes in the `mouse_model` and `disease`
        # tables and keep only the ID. For example, one row with 'MP:0001529 abnormal vocalization,MP:0002981 increased
        # liver weight' becomes two rows with 'MP:0001529' and 'MP:0002981'. Also note that the models are accessioned
        # with the same prefix ('MGI:') as genes, but they are separate entities.
        self.model_mouse_phenotypes = (
            self.load_solr_csv('mouse_model')
            .withColumn('mp_id', pf.expr(r"regexp_extract_all(model_phenotypes, '(MP:\\d+)', 1)"))
            .withColumn('mp_id', pf.explode('mp_id'))
            .select('model_id', 'mp_id')
            # E. g. 'MGI:3800884', 'MP:0001304'.
        )
        self.disease_human_phenotypes = (
            self.load_solr_csv('disease')
            .withColumn('hp_id', pf.expr(r"regexp_extract_all(disease_phenotypes, '(HP:\\d+)', 1)"))
            .withColumn('hp_id', pf.explode('hp_id'))
            .select('disease_id', 'hp_id')
            # E.g. 'OMIM:609258', 'HP:0000545 Myopia'.
        )
        self.disease_model_summary = (
            self.load_solr_csv('disease_model_summary')
            .withColumnRenamed('model_genetic_background', 'biologicalModelGeneticBackground')
            .withColumnRenamed('model_description', 'biologicalModelAllelicComposition')
            # In Phenodigm, the scores report the association between diseases and animal models, not genes. The
            # phenotype similarity is computed using an algorithm called OWLSim which expresses the similarity in terms
            # of the Jaccard Index (simJ) or Information Content (IC). Therefore, to compute the score you can take the
            # maximum score of both analyses (disease_model_max_norm) or a combination of them both
            # (disease_model_avg_norm). In the Results and discussion section of the Phenodigm paper, the methods are
            # compared to a number of gold standards. It is concluded that the geometric mean of both analyses is the
            # superior metric and should therefore be used as the score.
            .withColumn('resourceScore', pf.col('disease_model_avg_norm').cast('float'))
            .drop('disease_model_avg_norm')
            .withColumnRenamed('marker_id', 'targetInModelMgiId')
            .select('model_id', 'biologicalModelGeneticBackground', 'biologicalModelAllelicComposition', 'disease_id',
                    'disease_term', 'resourceScore', 'targetInModelMgiId')
            # E. g. 'MGI:2681494', 'C57BL/6JY-smk', 'smk/smk', 'ORPHA:3097', 'Meacham Syndrome', 91.6, 'MGI:98324'.
        )

        self.ontology = (
            self.load_solr_csv('ontology')  # E.g. 'HP', 'HP:0000002', 'Abnormality of body height'.
            .filter((pf.col('ontology') == 'MP') | (pf.col('ontology') == 'HP'))
        )
        assert self.ontology.select('phenotype_id').distinct().count() == self.ontology.count(), \
            f'Encountered multiple names for the same term in the ontology table.'
        # Process ontology information to enable MP and HP term lookup based on the ID.
        self.mp_terms, self.hp_terms = (
            self.ontology
            .filter(pf.col('ontology') == ontology_name)
            .withColumnRenamed('phenotype_id', f'{ontology_name.lower()}_id')
            .withColumnRenamed('phenotype_term', f'{ontology_name.lower()}_term')
            .select(f'{ontology_name.lower()}_id', f'{ontology_name.lower()}_term')
            for ontology_name in ('MP', 'HP')
        )
        # Process MP definitions to extract high level classes for each term.
        mp = pronto.Ontology(os.path.join(self.cache_dir, self.MGI_MP_FILENAME))
        high_level_classes = set(mp['MP:0000001'].subclasses(distance=1)) - {mp['MP:0000001']}
        mp_class = [
            [term.id, mp_high_level_class.id, mp_high_level_class.name]
            for mp_high_level_class in high_level_classes
            for term in mp_high_level_class.subclasses()
        ]
        self.mp_class = self.spark.createDataFrame(
            data=mp_class, schema=['modelPhenotypeId', 'modelPhenotypeClassId', 'modelPhenotypeClassLabel']
            # E.g. 'MP:0000275', 'MP:0005385', 'cardiovascular system phenotype'
        )

        # Literature cross-references which link a given mouse phenotype and a given mouse target.
        mgi_pubmed = (
            self.load_tsv(
                self.MGI_PUBMED_FILENAME,
                column_names=['_0', '_1', '_2', 'mp_id', 'literature', 'targetInModelMgiId']
            )
            .select('mp_id', 'literature', 'targetInModelMgiId')
            .distinct()
            # Separate and explode targets.
            .withColumn('targetInModelMgiId', pf.split(pf.col('targetInModelMgiId'), r'\|'))
            .withColumn('targetInModelMgiId', pf.explode('targetInModelMgiId'))
            # Separate and explode literature references.
            .withColumn('literature', pf.split(pf.col('literature'), r'\|'))
            .withColumn('literature', pf.explode('literature'))
            .select('mp_id', 'literature', 'targetInModelMgiId')
            # E.g. 'MP:0000600', '12529408', 'MGI:97874'.
        )
        # Literature references for a given (model, gene) combination.
        self.literature = (
            self.disease_model_summary
            .select('model_id', 'targetInModelMgiId')
            .distinct()
            .join(self.model_mouse_phenotypes, on='model_id', how='inner')
            .join(mgi_pubmed, on=['targetInModelMgiId', 'mp_id'], how='inner')
            .groupby('model_id', 'targetInModelMgiId')
            .agg(pf.collect_set(pf.col('literature')).alias('literature'))
            .select('model_id', 'targetInModelMgiId', 'literature')
        )

    @staticmethod
    def _cleanup_model_identifier(dataset):
        return (
            # Model ID adjustments. First, strip the trailing modifiers, where present. The original ID, used for table
            # joins, may look like 'MGI:6274930#hom#early', where the first part is the allele ID and the second
            # specifies the zygotic state. There can be several models for the same allele ID with different phenotypes.
            # However, this information is also duplicated in `biologicalModelGeneticBackground` (for example:
            # 'C57BL/6NCrl,Ubl7<em1(IMPC)Tcp> hom early'), so in this field we strip those modifiers.
            dataset
            .withColumn(
                'biologicalModelId',
                pf.split(pf.col('model_id'), '#').getItem(0)
            )
            .drop('model_id')

            # Second, we only want to output the model names from the MGI namespace. An example of something we *don't*
            # want is 'NOT-RELEASED-025eb4a791'. This will be converted to null.
            .withColumn(
                'biologicalModelId',
                pf.when(pf.col('biologicalModelId').rlike(r'^MGI:\d+$'), pf.col('biologicalModelId'))
            )
        )

    def generate_IMPC_evidence_strings(self, score_cutoff):
        """Generate the evidence by renaming, transforming and joining the columns."""
        # Map mouse model phenotypes into human terms.
        model_human_phenotypes = (
            self.model_mouse_phenotypes
            .join(self.mouse_phenotype_to_human_phenotype, on='mp_id', how='inner')
            .select('model_id', 'hp_id')
        )

        # We are reporting all mouse phenotypes for a model, regardless of whether they can be mapped into any human
        # disease.
        all_mouse_phenotypes = (
            self.model_mouse_phenotypes
            .join(self.mp_terms, on='mp_id', how='inner')
            .groupby('model_id')
            .agg(
                pf.collect_set(pf.struct(
                    pf.col('mp_id').alias('id'),
                    pf.col('mp_term').alias('label')
                )).alias('diseaseModelAssociatedModelPhenotypes')
            )
            .select('model_id', 'diseaseModelAssociatedModelPhenotypes')
        )
        # For human phenotypes, we only want to include the ones which are present in the disease *and* also can be
        # traced back to the model phenotypes through the MP → HP mapping relationship.
        matched_human_phenotypes = (
            # We start with all possible pairs of model-disease associations.
            self.disease_model_summary.select('model_id', 'disease_id')
            # Add all disease phenotypes. Now we have: model_id, disease_id, hp_id (from disease).
            .join(self.disease_human_phenotypes, on='disease_id', how='inner')
            # Only keep the phenotypes which also appear in the mouse model (after mapping).
            .join(model_human_phenotypes, on=['model_id', 'hp_id'], how='inner')
            # Add ontology terms in addition to IDs. Now we have: model_id, disease_id, hp_id, hp_term.
            .join(self.hp_terms, on='hp_id', how='inner')
            .groupby('model_id', 'disease_id')
            .agg(
                pf.collect_set(pf.struct(
                    pf.col('hp_id').alias('id'),
                    pf.col('hp_term').alias('label')
                )).alias('diseaseModelAssociatedHumanPhenotypes')
            )
            .select('model_id', 'disease_id', 'diseaseModelAssociatedHumanPhenotypes')
        )

        self.evidence = (
            # This table contains all unique (model_id, disease_id) associations which form the base of the evidence
            # strings.
            self.disease_model_summary

            # Filter out the associations with a low score.
            .filter(~(pf.col('resourceScore') < score_cutoff))

            # Add mouse gene mapping information. The mappings are not necessarily one to one. When this happens, join
            # will handle the necessary explosions, and a single row from the original table will generate multiple
            # evidence strings. This adds the fields 'targetFromSourceId', 'targetInModelEnsemblId', and
            # 'targetFromSourceId'.
            .join(self.gene_mapping, on='targetInModelMgiId', how='inner')

            # Add all mouse phenotypes of the model → `diseaseModelAssociatedModelPhenotypes`.
            .join(all_mouse_phenotypes, on='model_id', how='left')
            # Add the matched model/disease human phenotypes → `diseaseModelAssociatedHumanPhenotypes`.
            .join(matched_human_phenotypes, on=['model_id', 'disease_id'], how='left')

            # Add literature references → 'literature'.
            .join(self.literature, on=['model_id', 'targetInModelMgiId'], how='left')
        )
        self.evidence = (
            # Post-process model ID field.
            self._cleanup_model_identifier(self.evidence)

            # Rename the disease data columns.
            .withColumnRenamed('disease_id', 'diseaseFromSourceId')
            .withColumnRenamed('disease_term', 'diseaseFromSource')
            # Add constant value columns.
            .withColumn('datasourceId', pf.lit('impc'))
            .withColumn('datatypeId', pf.lit('animal_model'))
        )

        # Add EFO mapping information.
        self.evidence = add_efo_mapping(evidence_strings=self.evidence, spark_instance=self.spark,
                                        ontoma_cache_dir=self.cache_dir)

        # In case of multiple records with the same unique fields, keep only the one record with the highest score. This
        # is done to avoid duplicates where multiple source ontology records map to the same EFO record with slightly
        # different scores.
        w = Window.partitionBy([pf.col(c) for c in UNIQUE_FIELDS]).orderBy(pf.col('resourceScore').desc())
        self.evidence = (
            self.evidence
            .withColumn('row', pf.row_number().over(w))
            .filter(pf.col('row') == 1)
            .drop('row')
        )

        # Ensure stable column order.
        self.evidence = self.evidence.select(
            'biologicalModelAllelicComposition', 'biologicalModelGeneticBackground', 'biologicalModelId',
            'datasourceId', 'datatypeId', 'diseaseFromSource', 'diseaseFromSourceId', 'diseaseFromSourceMappedId',
            'diseaseModelAssociatedHumanPhenotypes', 'diseaseModelAssociatedModelPhenotypes', 'literature',
            'resourceScore', 'targetFromSourceId', 'targetInModel', 'targetInModelEnsemblId', 'targetInModelMgiId'
        )

    def generate_mouse_phenotypes_dataset(self):
        """Generate the related mousePhenotypes dataset for the corresponding widget in the target object."""
        mouse_phenotypes = (
            # Extract base model-target associations.
            self.disease_model_summary
            .select('model_id', 'biologicalModelAllelicComposition', 'biologicalModelGeneticBackground',
                    'targetInModelMgiId')
            .distinct()

            # Add gene mapping information.
            .join(self.gene_mapping, on='targetInModelMgiId', how='inner')

            # Add mouse phenotypes.
            .join(self.model_mouse_phenotypes, on='model_id', how='inner')
            .join(self.mp_terms, on='mp_id', how='inner')

            # Add literature references.
            .join(self.literature, on=['model_id', 'targetInModelMgiId'], how='left')

            # Rename fields.
            .withColumnRenamed('mp_id', 'modelPhenotypeId')
            .withColumnRenamed('mp_term', 'modelPhenotypeLabel')

            # Join phenotype class information.
            .join(self.mp_class, on='modelPhenotypeId', how='inner')
        )

        # Post-process model ID field.
        mouse_phenotypes = self._cleanup_model_identifier(mouse_phenotypes)

        # Convert the schema from flat to partially nested, grouping related models and phenotype classes.
        self.mouse_phenotypes = (
            mouse_phenotypes
            .groupby(
                'targetInModel', 'targetInModelMgiId', 'targetInModelEnsemblId', 'targetFromSourceId',
                'modelPhenotypeId', 'modelPhenotypeLabel'
            )
            .agg(
                pf.collect_set(
                    pf.struct(
                        pf.col('biologicalModelAllelicComposition').alias('allelicComposition'),
                        pf.col('biologicalModelGeneticBackground').alias('geneticBackground'),
                        pf.col('biologicalModelId').alias('id'),
                        pf.col('literature')
                    )
                ).alias('biologicalModels'),
                pf.collect_set(
                    pf.struct(
                        pf.col('modelPhenotypeClassId').alias('id'),
                        pf.col('modelPhenotypeClassLabel').alias('label')
                    )
                ).alias('modelPhenotypeClasses')
            )
        )

    def write_datasets(self, evidence_strings_filename, mouse_phenotypes_filename):
        """Dump the Spark evidence dataframe as a compressed JSON file. The order of the evidence strings is not
        maintained, and they are returned in random order as collected by Spark."""
        for dataset, outfile in ((self.evidence, evidence_strings_filename),
                                 (self.mouse_phenotypes, mouse_phenotypes_filename)):
            logging.info(f'Processing dataset {outfile}')
            write_evidence_strings(dataset, outfile)


def main(
    cache_dir, evidence_output, mouse_phenotypes_output, score_cutoff, use_cached=False, log_file=None, local=False
) -> None:
    # Initialize the logger based on the provided log file. If no log file is specified, logs are written to STDERR.
    logging_config = {
        'level': logging.INFO,
        'format': '%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s',
        'datefmt': '%Y-%m-%d %H:%M:%S',
    }
    if log_file:
        logging_config['filename'] = log_file
    logging.basicConfig(**logging_config)

    # Process the data.
    impc = IMPC(logging, cache_dir, local)
    if not use_cached:
        logging.info('Update the HGNC/MGI/SOLR cache.')
        impc.update_cache()

    logging.info('Load gene mappings and SOLR data from local cache.')
    impc.load_data_from_cache()

    logging.info('Build the evidence strings.')
    impc.generate_IMPC_evidence_strings(score_cutoff)

    logging.info('Generate the mousePhenotypes dataset.')
    impc.generate_mouse_phenotypes_dataset()

    logging.info('Collect and write the datasets.')
    impc.write_datasets(evidence_output, mouse_phenotypes_output)


if __name__ == '__main__':
    parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    req = parser.add_argument_group('required arguments')
    req.add_argument('--cache-dir', required=True, help='Directory to store the HGNC/MGI/SOLR cache files in.')
    req.add_argument('--output-evidence', required=True,
                     help='Name of the json.gz file to output the evidence strings into.')
    req.add_argument('--output-mouse-phenotypes', required=True,
                     help='Name of the json.gz file to output the mousePhenotypes dataset into.')
    parser.add_argument('--score-cutoff', help=(
        'Discard model-disease associations with the `disease_model_avg_norm` score less than this value. The score '
        'ranges from 0 to 100.'
    ), type=float, default=0.0)
    parser.add_argument('--use-cached', help='Use the existing cache and do not update it.', action='store_true')
    parser.add_argument('--log-file', help='Optional filename to redirect the logs into.')
    parser.add_argument(
        '--local', action='store_true', required=False, default=False,
        help='Flag to indicate if the script is executed locally or on the cluster'
    )
    args = parser.parse_args()
    main(args.cache_dir, args.output_evidence, args.output_mouse_phenotypes, args.score_cutoff, args.use_cached,
         args.log_file, args.local)
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
from __future__ import annotations
import argparse
import sys
import logging

from pyspark.sql import DataFrame, functions as f

from common.evidence import (
    write_evidence_strings,
    initialize_sparksession,
)


class intogenEvidenceGenerator:
    def __init__(
        self: intogenEvidenceGenerator,
        inputGenes: str,
        inputCohorts: str,
        diseaseMapping: str | None,
    ) -> None:
        # Initialize spark session:
        self.spark = initialize_sparksession()

        # Reading cohort table:
        cohorts = self.spark.read.csv(
            inputCohorts, sep=r"\t", header=True, inferSchema=True
        ).drop("SAMPLES")

        # Reading cancer driver genes:
        self.evidence = (
            self.spark.read.csv(inputGenes, sep=r"\t", header=True, inferSchema=True)
            # Join with cohort table:
            .join(cohorts, on="COHORT", how="inner").select(
                # Adding constant columns:
                f.lit("somatic_mutation").alias("datatypeId"),
                f.lit("intogen").alias("datasourceId"),
                # Extracting gene fields:
                f.col("SYMBOL").alias("targetFromSourceId"),
                # Extracting cohort specific annotation:
                f.col("COHORT").alias("cohortId"),
                f.col("COHORT_NICK").alias("cohortShortName"),
                f.col("COHORT_NAME").alias("cohortDescription"),
                # Splitting methods:
                f.split(f.col("METHODS"), ",").alias("significantDriverMethods"),
                # Generating mutated samples column:
                f.array(
                    f.struct(
                        # Parse functional consequences:
                        f.when(f.col("ROLE") == "Act", "SO_0002053")  # Gain of function
                        .when(f.col("ROLE") == "LoF", "SO_0002054")  # Loss of function
                        .otherwise(None)
                        .alias("functionalConsequenceId"),
                        # Extract samples:
                        f.col("SAMPLES").alias("numberSamplesTested"),
                        f.col("SAMPLES").alias("numberMutatedSamples"),
                    )
                ).alias("mutatedSamples"),
                # Adding disease name:
                f.col("CANCER").alias("Cancer_type_acronym"),
                f.col("CANCER_NAME").alias("diseaseFromSource"),
                # Adding score:
                f.col("QVALUE_COMBINATION").alias("resourceScore"),
            )
        )

        # Mapping step executed if mapping file is provided:
        if diseaseMapping is not None:
            logging.info(f"Applying disease mapping from {diseaseMapping}.")
            self.evidence = self.cancer2EFO(diseaseMapping)

            # Extracting stats about mapping:
            unmapped_evidence_count = self.evidence.filter(
                f.col("diseaseFromSourceMappedId").isNull()
            ).count()
            unmapped_disease_count = (
                self.evidence.filter(f.col("diseaseFromSourceMappedId").isNull())
                .select("diseaseFromSource")
                .distinct()
                .count()
            )
            logging.info(
                f"Number of evidence without EFO mapping: {unmapped_evidence_count}"
            )
            logging.info(
                f"Number of diseases without EFO mapping: {unmapped_disease_count}"
            )

        # Dropping cancer type acronym:
        self.evidence = self.evidence.drop("Cancer_type_acronym")

    def write_evidence(self: intogenEvidenceGenerator, evidence_file: str) -> None:
        write_evidence_strings(self.evidence, evidence_file)
        logging.info(
            f"{self.evidence.count()} evidence strings saved into {evidence_file}. Exiting."
        )

    def cancer2EFO(self: intogenEvidenceGenerator, diseaseMapping: str) -> DataFrame:
        diseaseMappingsFile = self.spark.read.csv(
            diseaseMapping, sep=r"\t", header=True
        ).select(
            "Cancer_type_acronym",
            f.trim(f.col("EFO_id")).alias("diseaseFromSourceMappedId"),
        )

        return self.evidence.join(
            diseaseMappingsFile, on="Cancer_type_acronym", how="left"
        )


def main(inputGenes, inputCohorts, diseaseMapping, outputFile):
    # Logging parameters
    logging.info(f"intOGen driver genes table: {inputGenes}")
    logging.info(f"intOGen cohorts table: {inputCohorts}")
    logging.info(f"Output file: {outputFile}")
    if diseaseMapping:
        logging.info(f"Cancer type to EFO ID table: {diseaseMapping}")
    else:
        logging.info("Disease mapping is skipped.")

    # Initialize evidence builder object
    evidenceBuilder = intogenEvidenceGenerator(inputGenes, inputCohorts, diseaseMapping)

    # Write evidence file:
    evidenceBuilder.write_evidence(outputFile)


if __name__ == "__main__":
    # Initiating parser
    parser = argparse.ArgumentParser(
        description="This script generates evidences for the intOGen data source."
    )

    parser.add_argument(
        "-g",
        "--inputGenes",
        required=True,
        type=str,
        help="Input source .tsv file listing the driver genes across the analyzed cohorts.",
    )
    parser.add_argument(
        "-c",
        "--inputCohorts",
        required=True,
        type=str,
        help="Input source .tsv file with information about the analyzed cohorts.",
    )
    parser.add_argument(
        "-d",
        "--diseaseMapping",
        required=False,
        type=str,
        help="Input look-up table containing the cancer type mappings to an EFO ID.",
    )
    parser.add_argument(
        "-o",
        "--outputFile",
        required=True,
        type=str,
        help="Gzipped JSON file containing the evidence strings.",
    )
    parser.add_argument(
        "-l",
        "--logFile",
        help="Destination of the logs generated by this script.",
        type=str,
        required=False,
    )

    # Parsing parameters
    args = parser.parse_args()
    inputGenes = args.inputGenes
    inputCohorts = args.inputCohorts
    diseaseMapping = args.diseaseMapping
    outputFile = args.outputFile

    # Initialize logging:
    logging.basicConfig(
        level=logging.INFO,
        format="%(name)s - %(levelname)s - %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
    if args.logFile:
        logging.config.fileConfig(filename=args.logFile)
    else:
        logging.StreamHandler(sys.stderr)

    main(inputGenes, inputCohorts, diseaseMapping, outputFile)
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
import argparse
from itertools import chain
import logging
import sys

import xml.etree.ElementTree as ET
from pyspark.sql import DataFrame, Row
from pyspark.sql.functions import array_distinct, col, create_map, lit, split

from common.ontology import add_efo_mapping
from common.evidence import initialize_sparksession, write_evidence_strings

# The rest of the types are assigned to -> germline for allele origins
EXCLUDED_ASSOCIATIONTYPES = [
    'Major susceptibility factor in',
    'Part of a fusion gene in',
    'Candidate gene tested in',
    'Role in the phenotype of',
    'Biomarker tested in',
    'Disease-causing somatic mutation(s) in',
]

# Assigning variantFunctionalConsequenceId:
CONSEQUENCE_MAP = {
    'Disease-causing germline mutation(s) (loss of function) in': 'SO_0002054',
    'Disease-causing germline mutation(s) in': None,
    'Modifying germline mutation in': None,
    'Disease-causing germline mutation(s) (gain of function) in': 'SO_0002053',
}


def main(input_file: str, output_file: str, cache_dir: str) -> None:

    # Read and process Orphanet's XML file into evidence strings

    orphanet_df = parse_orphanet_xml(input_file, spark)
    logging.info('Orphanet input file has been imported. Processing evidence strings.')

    evidence_df = process_orphanet(orphanet_df)

    evidence_df = add_efo_mapping(evidence_strings=evidence_df, spark_instance=spark, ontoma_cache_dir=cache_dir)
    logging.info('Disease mappings have been added.')

    # Save data
    write_evidence_strings(evidence_df, output_file)
    logging.info(f'{evidence_df.count()} evidence strings have been saved to {output_file}')


def parse_orphanet_xml(orphanet_file: str, spark_instance) -> DataFrame:
    """
    Function to parse Orphanet xml dump and return the parsed
    data as a list of dictionaries.

    Args:
        orphanet_file (str): Orphanet XML filename

    Returns:
        parsed data as a list of dictionaries
    """

    # Reading + validating xml:
    tree = ET.parse(orphanet_file)
    assert isinstance(tree, ET.ElementTree)

    root = tree.getroot()
    assert isinstance(root, ET.Element)

    # Checking if the basic nodes are in the xml structure:

    logging.info(f"There are {root.find('DisorderList').get('count')} disease in the Orphanet xml file.")

    orphanet_disorders = []

    for disorder in root.find('DisorderList').findall('Disorder'):

        # Extracting disease information:
        parsed_disorder = {
            'diseaseFromSource': disorder.find('Name').text,
            'diseaseFromSourceId': 'Orphanet_' + disorder.find('OrphaCode').text,
            'type': disorder.find('DisorderType/Name').text,
        }

        # One disease might be mapped to multiple genes:
        for association in disorder.find('DisorderGeneAssociationList'):

            # For each mapped gene, an evidence is created:
            evidence = parsed_disorder.copy()

            # Not all gene/disease association is backed up by publication:
            try:
                evidence['literature'] = [
                    pmid.replace('[PMID]', '').rstrip()
                    for pmid in association.find('SourceOfValidation').text.split('_')
                    if '[PMID]' in pmid
                ]
            except AttributeError:
                evidence['literature'] = None

            evidence['associationType'] = association.find('DisorderGeneAssociationType/Name').text
            evidence['confidence'] = association.find('DisorderGeneAssociationStatus/Name').text

            # Parse gene name and id - going for Ensembl gene id only:
            gene = association.find('Gene')
            evidence['targetFromSource'] = gene.find('Name').text

            # Extracting ensembl gene id from cross references:
            ensembl_gene_id = [
                xref.find('Reference').text
                for xref in gene.find('ExternalReferenceList')
                if 'ENSG' in xref.find('Reference').text
            ]
            evidence['targetFromSourceId'] = ensembl_gene_id[0] if len(ensembl_gene_id) > 0 else None

            # Collect evidence:
            orphanet_disorders.append(evidence)

    # Create a spark dataframe from the parsed data
    orphanet_df = spark_instance.createDataFrame(Row(**x) for x in orphanet_disorders)

    return orphanet_df


def process_orphanet(orphanet_df: DataFrame) -> DataFrame:
    """
    The JSON Schema format is applied to the df
    """

    # Map association type to sequence ontology ID:
    so_mapping_expr = create_map([lit(x) for x in chain(*CONSEQUENCE_MAP.items())])

    evidence_df = (
        orphanet_df.filter(~col('associationType').isin(EXCLUDED_ASSOCIATIONTYPES))
        .filter(~col('targetFromSourceId').isNull())
        .withColumn('dataSourceId', lit('orphanet'))
        .withColumn('datatypeId', lit('genetic_association'))
        .withColumn('alleleOrigins', split(lit('germline'), '_'))
        .withColumn('literature', array_distinct(col('literature')))
        .withColumn(
            'variantFunctionalConsequenceId',
            so_mapping_expr.getItem(col('associationType')),
        )
        .drop('associationType', 'type')
        # Select the evidence relevant fields
        .select(
            'datasourceId',
            'datatypeId',
            'alleleOrigins',
            'confidence',
            'diseaseFromSource',
            'diseaseFromSourceId',
            'literature',
            'targetFromSource',
            'targetFromSourceId',
            'variantFunctionalConsequenceId',
        )
        .persist()
    )

    return evidence_df


if __name__ == '__main__':

    parser = argparse.ArgumentParser(
        description=(
            'Parse Orphanet gene-disease annotation downloaded from '
            'http://www.orphadata.org/data/xml/en_product6.xml'
        )
    )
    parser.add_argument(
        '--input_file',
        help='Xml file containing target/disease associations.',
        type=str,
    )
    parser.add_argument(
        '--output_file',
        help='Absolute path of the gzipped, JSON evidence file.',
        type=str,
    )
    parser.add_argument(
        '--logFile',
        help='Destination of the logs generated by this script.',
        type=str,
        required=False,
    )
    parser.add_argument(
        '--cache_dir',
        required=False,
        help='Directory to store the OnToma cache files in.',
    )

    args = parser.parse_args()

    input_file = args.input_file
    output_file = args.output_file
    log_file = args.logFile
    cache_dir = args.cache_dir

    # Initialize logging:
    logging.basicConfig(
        level=logging.INFO,
        format='%(name)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S',
    )

    global spark
    spark = initialize_sparksession()

    if log_file:
        logging.config.fileConfig(filename=log_file)
    else:
        logging.StreamHandler(sys.stderr)

    main(input_file, output_file, cache_dir)
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
import argparse
import logging
import re

import requests
from pyspark.sql import functions as f, SparkSession

from common.ontology import add_efo_mapping
from common.evidence import write_evidence_strings


class PanelAppEvidenceGenerator:

    # Fixes which are applied to original PanelApp phenotype strings *before* splitting them by semicolon.
    PHENOTYPE_BEFORE_SPLIT_RE = {
        # Fixes for specific records.
        r"\(HP:0006574;\);": r"(HP:0006574);",
        r"Abruzzo-Erickson;syndrome": r"Abruzzo-Erickson syndrome",
        r"Deafness, autosomal recessive; 12": r"Deafness, autosomal recessive, 12",
        r"Waardenburg syndrome, type; 3": r"Waardenburg syndrome, type 3",
        r"Ectrodactyly, ectodermal dysplasia, and cleft lip/palate syndrome; 3": r"Ectrodactyly, ectodermal dysplasia, and cleft lip/palate syndrome, 3",
        # Remove curly braces. They are *sometimes* (not consistently) used to separate disease name and OMIM code, for
        # example: "{Bladder cancer, somatic}, 109800", and interfere with regular expressions for extraction.
        r"[{}]": r"",
        # Fix cases like "Aarskog-Scott syndrome, 305400Mental retardation, X-linked syndromic 16, 305400", where
        # several phenotypes are glued to each other due to formatting errors.
        r"(\d{6})([A-Za-z])": r"$1;$2",
        # Replace all tab/space sequences with a single space.
        r"[\t ]+": r" ",
        # Remove leading and/or trailing spaces around semicolons.
        r" ?; ?": r";",
    }

    # Cleanup regular expressions which are applied to the phenotypes *after* splitting.
    PHENOTYPE_AFTER_SPLIT_RE = (
        r" \(no OMIM number\)",
        r" \(NO phenotype number in OMIM\)",
        r"(no|No|NO) OMIM( phenotype|number|entry|NUMBER|NUMBER OR DISEASE)?",
        r"[( ]*(from )?PMID:? *\d+[ ).]*",
    )

    # Regular expressions for extracting ontology information.
    LEADING = r"[ ,-]*"
    SEPARATOR = r"[:_ #]*"
    TRAILING = r"[:.]*"
    OMIM_RE = LEADING + r"(OMIM|MIM)?" + SEPARATOR + r"(\d{6})" + TRAILING
    OTHER_RE = (
        LEADING
        + r"(OrphaNet: ORPHA|Orphanet|ORPHA|HP|MONDO)"
        + SEPARATOR
        + r"(\d+)"
        + TRAILING
    )

    # Regular expression for filtering out bad PMID records.
    PMID_FILTER_OUT_RE = r"224,614,752,030,146,000,000,000"

    # Regular expressions for extracting publication information from the API raw strings.
    PMID_RE = [
        (
            # Pattern 1, e.g. '15643612' or '28055140, 27333055, 23063529'.
            r"^"  # Start of the string.
            r"[\d, ]+"  # A sequence of digits, commas and spaces.
            r"(?: |$)"  # Ending either with a space, or with the end of the string.
        ),
        (
            # Pattern 2, e.g. '... observed in the patient. PMID: 1908107 - publication describing function of ...'
            r"(?:PubMed|PMID)"  # PubMed or a PMID prefix.
            r"[: ]*"  # An optional separator (spaces/colons).
            r"[\d, ]+"  # A sequence of digits, commas and spaces.
        ),
    ]

    def __init__(self, debug_output_prefix=None):
        self.spark = SparkSession.builder.appName("panelapp_parser").getOrCreate()
        self.debug_output_phenotypes_filename = None
        self.debug_output_pmids_filename = None
        self.debug_output_pmids_data = set()
        if debug_output_prefix:
            self.debug_output_phenotypes_filename = (
                f"{debug_output_prefix}.phenotypes.tsv"
            )
            self.debug_output_pmids_filename = f"{debug_output_prefix}.pmids.tsv"

    def __del__(self):
        if self.debug_output_pmids_filename:
            with open(self.debug_output_pmids_filename, "w") as outfile:
                outfile.write(
                    "Source PMID string\tExtracted PMIDs\tNumber of extracted PMIDs\n"
                )
                for line in sorted(set(self.debug_output_pmids_data)):
                    outfile.write(line)

    def generate_panelapp_evidence(
        self, input_file: str, output_file: str, cache_dir: str
    ) -> None:
        logging.info("Filter and extract the necessary columns.")
        panelapp_df = self.spark.read.csv(input_file, sep=r"\t", header=True)
        # Panel version can be either a single number (e.g. 1), or two numbers separated by a dot (e.g. 3.14). We cast
        # either representation to float to ensure correct filtering below. (Note that conversion to float would not
        # work in the general case, because 3.4 > 3.14, but we only need to compare relative to 1.0.)
        panelapp_df = panelapp_df.withColumn(
            "Panel Version",
            panelapp_df["Panel Version"].cast("float").alias("Panel Version"),
        )
        panelapp_df = (
            panelapp_df.filter(
                ((f.col("List") == "green") | (f.col("List") == "amber"))
                & (f.col("Panel Version") >= 1.0)
                & (f.col("Panel Status") == "PUBLIC")
            ).select(
                "Symbol",
                "Panel Id",
                "Panel Name",
                "List",
                "Mode of inheritance",
                "Phenotypes",
            )
            # The full original records are not redundant; however, uniqueness on a subset of fields is not guaranteed.
            .distinct()
        )

        logging.info(
            "Fix typos and formatting errors which would interfere with phenotype splitting."
        )
        panelapp_df = panelapp_df.withColumn("cleanedUpPhenotypes", f.col("Phenotypes"))
        for regexp, replacement in self.PHENOTYPE_BEFORE_SPLIT_RE.items():
            panelapp_df = panelapp_df.withColumn(
                "cleanedUpPhenotypes",
                f.regexp_replace(f.col("cleanedUpPhenotypes"), regexp, replacement),
            )

        logging.info("Split and explode the phenotypes.")
        panelapp_df = panelapp_df.withColumn(
            "cohortPhenotypes",
            f.array_distinct(f.split(f.col("cleanedUpPhenotypes"), ";")),
        ).withColumn("phenotype", f.explode(f.col("cohortPhenotypes")))

        logging.info(
            "Remove specific patterns and phrases which will interfere with ontology extraction and mapping."
        )
        panelapp_df = panelapp_df.withColumn("diseaseFromSource", f.col("phenotype"))
        for regexp in self.PHENOTYPE_AFTER_SPLIT_RE:
            panelapp_df = panelapp_df.withColumn(
                "diseaseFromSource",
                f.regexp_replace(f.col("diseaseFromSource"), f"({regexp})", ""),
            )

        logging.info(
            "Extract ontology information, clean up and filter the split phenotypes."
        )
        panelapp_df = (
            panelapp_df
            # Extract Orphanet/MONDO/HP ontology identifiers and remove them from the phenotype string.
            .withColumn(
                "ontology_namespace",
                f.regexp_extract(f.col("diseaseFromSource"), self.OTHER_RE, 1),
            )
            .withColumn(
                "ontology_namespace",
                f.regexp_replace(
                    f.col("ontology_namespace"), "OrphaNet: ORPHA", "ORPHA"
                ),
            )
            .withColumn(
                "ontology_id",
                f.regexp_extract(f.col("diseaseFromSource"), self.OTHER_RE, 2),
            )
            .withColumn(
                "ontology",
                f.when(
                    (f.col("ontology_namespace") != "") & (f.col("ontology_id") != ""),
                    f.concat(
                        f.col("ontology_namespace"), f.lit(":"), f.col("ontology_id")
                    ),
                ),
            )
            .withColumn(
                "diseaseFromSource",
                f.regexp_replace(f.col("diseaseFromSource"), f"({self.OTHER_RE})", ""),
            )
            # Extract OMIM identifiers and remove them from the phenotype string.
            .withColumn(
                "omim_id", f.regexp_extract(f.col("diseaseFromSource"), self.OMIM_RE, 2)
            )
            .withColumn(
                "omim",
                f.when(
                    f.col("omim_id") != "", f.concat(f.lit("OMIM:"), f.col("omim_id"))
                ),
            )
            .withColumn(
                "diseaseFromSource",
                f.regexp_replace(f.col("diseaseFromSource"), f"({self.OMIM_RE})", ""),
            )
            # Choose one of the ontology identifiers, keeping OMIM as a priority.
            .withColumn(
                "diseaseFromSourceId",
                f.when(f.col("omim").isNotNull(), f.col("omim")).otherwise(
                    f.col("ontology")
                ),
            )
            .drop("ontology_namespace", "ontology_id", "ontology", "omim_id", "omim")
            # Clean up the final split phenotypes.
            .withColumn(
                "diseaseFromSource",
                f.regexp_replace(f.col("diseaseFromSource"), r"\(\)", ""),
            )
            .withColumn("diseaseFromSource", f.trim(f.col("diseaseFromSource")))
            .withColumn(
                "diseaseFromSource",
                f.when(f.col("diseaseFromSource") != "", f.col("diseaseFromSource")),
            )
            # Remove low quality records, where the name of the phenotype string starts with a question mark.
            .filter(
                ~(
                    (f.col("diseaseFromSource").isNotNull())
                    & (f.col("diseaseFromSource").startswith("?"))
                )
            )
            # Remove duplication caused by cases where multiple phenotypes within the same record fail to generate any
            # phenotype string or ontology identifier.
            .distinct()
            # For records where we were unable to determine either a phenotype string nor an ontology identifier,
            # substitute the panel name instead.
            .withColumn(
                "diseaseFromSource",
                f.when(
                    (f.col("diseaseFromSource").isNull())
                    & (f.col("diseaseFromSourceId").isNull()),
                    f.col("Panel Name"),
                ).otherwise(f.col("diseaseFromSource")),
            )
            .persist()
        )

        logging.info("Fetch and join literature references.")
        all_panel_ids = panelapp_df.select("Panel Id").toPandas()["Panel Id"].unique()
        literature_references = self.fetch_literature_references(all_panel_ids)
        panelapp_df = panelapp_df.join(
            literature_references, on=["Panel Id", "Symbol"], how="left"
        )

        if self.debug_output_phenotypes_filename:
            logging.info("Output tables for debugging purposes, if requested.")
            (
                panelapp_df.select(
                    "Phenotypes",  # Original, unaltered string with all phenotypes.
                    "cleanedUpPhenotypes",  # String with phenotypes after pre-split cleanup.
                    "phenotype",  # Individual phenotype after splitting.
                    "diseaseFromSource",  # Final cleaned up disease name.
                    "diseaseFromSourceId",  # Final cleaned up disease ID.
                )
                .distinct()
                .toPandas()
                .to_csv(self.debug_output_phenotypes_filename, sep="\t", index=False)
            )

        logging.info(
            "Drop unnecessary fields and populate the final evidence string structure."
        )
        evidence_df = (
            panelapp_df.drop("Phenotypes", "cleanedUpPhenotypes", "phenotype")
            # allelicRequirements requires a list, but we always only have one value from PanelApp.
            .withColumn(
                "allelicRequirements",
                f.when(
                    f.col("Mode of inheritance").isNotNull(),
                    f.array(f.col("Mode of inheritance")),
                ),
            )
            .drop("Mode of inheritance")
            .withColumnRenamed("List", "confidence")
            .withColumn("datasourceId", f.lit("genomics_england"))
            .withColumn("datatypeId", f.lit("genetic_literature"))
            # diseaseFromSourceId populated above
            # literature populated above
            .withColumnRenamed("Panel Id", "studyId")
            .withColumnRenamed("Panel Name", "studyOverview")
            .withColumnRenamed("Symbol", "targetFromSourceId")
            # Some residual duplication is caused by slightly different representations from `cohortPhenotypes` being
            # cleaned up to the same representation in `diseaseFromSource`, for example "Pontocerebellar hypoplasia type
            # 2D (613811)" and "Pontocerebellar hypoplasia type 2D, 613811".
            .distinct()
        )

        evidence_df = add_efo_mapping(
            evidence_strings=evidence_df,
            spark_instance=self.spark,
            ontoma_cache_dir=cache_dir,
        )
        logging.info("Disease mappings have been added.")

        write_evidence_strings(evidence_df, output_file)
        logging.info(
            f"{evidence_df.count()} evidence strings have been saved to {output_file}"
        )

    def fetch_literature_references(self, all_panel_ids):
        """Queries the PanelApp API to extract all literature references for (panel ID, gene symbol) combinations."""
        publications = []  # Contains tuples of (panel ID, gene symbol, PubMed ID).
        for panel_id in all_panel_ids:
            logging.debug(f"Fetching literature references for panel {panel_id!r}.")
            url = f"https://panelapp.genomicsengland.co.uk/api/v1/panels/{panel_id}"
            panel_data = requests.get(url).json()

            # The source data and the online data might not be in-sync, due to requesting retired panels.
            # We still keep these entries, but won't try to fetch gene data:
            if "genes" not in panel_data:
                logging.warn(f"Panel info could not retrieved for panel id: {panel_id}")
                continue

            for gene in panel_data["genes"]:
                for publication_string in gene["publications"]:
                    publications.extend(
                        [
                            (panel_id, gene["gene_data"]["gene_symbol"], pubmed_id)
                            for pubmed_id in self.extract_pubmed_ids(publication_string)
                        ]
                    )
        # Group by (panel ID, gene symbol) pairs and convert into a PySpark dataframe.
        return (
            self.spark.createDataFrame(
                publications, schema=("Panel ID", "Symbol", "literature")
            )
            .groupby(["Panel ID", "Symbol"])
            .agg(f.collect_set("literature").alias("literature"))
        )

    def extract_pubmed_ids(self, publication_string):
        """Parses the publication information from the PanelApp API and extracts PubMed IDs."""
        publication_string = (
            publication_string.encode("ascii", "ignore")
            .decode()  # To get rid of zero width spaces and other special characters.
            .strip()
            .replace("\n", "")
            .replace("\r", "")
        )
        pubmed_ids = []

        if not re.match(self.PMID_FILTER_OUT_RE, publication_string):
            for regexp in self.PMID_RE:  # For every known representation pattern...
                for occurrence in re.findall(
                    regexp, publication_string
                ):  # For every occurrence of this pattern...
                    pubmed_ids.extend(
                        re.findall(r"(\d+)", occurrence)
                    )  # Extract all digit sequences (PubMed IDs).

        # Filter out:
        # * 0 as a value, because it is a placeholder for a missing ID;
        # * PubMed IDs which are too short or too long.
        pubmed_ids = {
            pubmed_id
            for pubmed_id in pubmed_ids
            if pubmed_id != "0" and len(pubmed_id) <= 8
        }

        if self.debug_output_pmids_filename:
            # Output the original PMID string, the extracted PMIDs, and the total number of the records extracted.
            self.debug_output_pmids_data.add(
                f'{publication_string}\t{" ".join(sorted(pubmed_ids))}\t{len(pubmed_ids)}\n'
            )

        return pubmed_ids


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument(
        "--input-file",
        required=True,
        type=str,
        help="Input TSV file with the table containing association details.",
    )
    parser.add_argument(
        "--output-file",
        required=True,
        type=str,
        help="Output JSON file (gzip-compressed) with the evidence strings.",
    )
    parser.add_argument(
        "--debug-output-prefix",
        required=False,
        type=str,
        help="If specified, a number of files will be created with this prefix to store phenotype/PMID cleanup data "
        "for debugging purposes.",
    )
    parser.add_argument(
        "--cache_dir",
        required=False,
        help="Directory to store the OnToma cache files in.",
    )

    args = parser.parse_args()
    PanelAppEvidenceGenerator(args.debug_output_prefix).generate_panelapp_evidence(
        input_file=args.input_file,
        output_file=args.output_file,
        cache_dir=args.cache_dir,
    )
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import sys
import argparse
import gzip
import logging
import json

from pyspark.sql import SparkSession
import pyspark.sql.functions as F


class progenyEvidenceGenerator:

    def __init__(self):
        # Create spark session
        self.spark = (
            SparkSession.builder
            .appName('progeny')
            .getOrCreate()
        )

        # Initialize source table
        self.dataframe = None

    def generateEvidenceFromSource(self, inputFile, diseaseMapping, pathwayMapping, skipMapping):
        '''
        Processing of the input file to build all the evidences from its data
        Returns:
            evidences (array): Object with all the generated evidences strings from source file
        '''
        # Read input file
        self.dataframe = (
            self.spark.read
            .option('header', 'true')
            .option('delimiter', '\t')
            .option('inferSchema', 'true')
            .csv(inputFile)
        )

        # Disease mapping step
        if not skipMapping:
            try:
                self.dataframe = self.cancer2EFO(diseaseMapping)
                logging.info('Disease mappings have been imported.')
            except Exception as e:
                logging.error(f'An error occurred while importing disease mappings: \n{e}.')
        else:
            logging.info('Disease mapping has been skipped.')
            self.dataframe = self.dataframe.withColumn('EFO_id', F.lit(None))

        self.dataframe = self.pathway2Reactome(pathwayMapping)
        logging.info('Pathway to reaction ID mappings have been imported.')

        # Build evidence strings per row
        logging.info('Generating evidence:')
        evidences = (
            self.dataframe.rdd
            .map(progenyEvidenceGenerator.parseEvidenceString)
            .collect()  # list of dictionaries
        )

        return evidences

    def cancer2EFO(self, diseaseMapping):
        diseaseMappingsFile = (
            self.spark
            .read.csv('resources/cancer2EFO_mappings.tsv', sep=r'\t', header=True)
            .select('Cancer_type_acronym', 'EFO_id')
            .withColumnRenamed('Cancer_type_acronym', 'Cancer_type')
        )

        self.dataframe = self.dataframe.join(
            diseaseMappingsFile,
            on='Cancer_type',
            how='left'
        )

        return self.dataframe

    def pathway2Reactome(self, pathwayMapping):

        pathwayMappingsFile = (
            self.spark
            .read.csv('resources/pathway2Reactome_mappings.tsv', sep=r'\t', header=True)
            .withColumnRenamed('pathway', 'Pathway')
        )

        self.dataframe = (
            self.dataframe
            .join(pathwayMappingsFile, on='Pathway', how='inner')
            .withColumn('target', F.split(F.col('target'), ', '))
            .withColumn('target', F.explode('target'))
        )

        return self.dataframe

    @staticmethod
    def parseEvidenceString(row):
        try:
            evidence = {
                'datasourceId': 'progeny',
                'datatypeId': 'affected_pathway',
                'resourceScore': row['P.Value'],
                'targetFromSourceId': row['target'],
                'diseaseFromSource': row['Cancer_type'],
                'pathways': [
                    {
                        'id': row['reactomeId'],
                        'name': row['description']
                    }
                ]
            }

            # Skipping unmapped diseases:
            if row['EFO_id']:
                evidence['diseaseFromSourceMappedId'] = row['EFO_id']

            return evidence
        except Exception:
            raise


def main(inputFile, diseaseMapping, pathwayMapping, outputFile, skipMapping):

    # Logging parameters
    logging.info(f'PROGENy input table: {inputFile}')
    logging.info(f'Cancer type to EFO ID table: {diseaseMapping}')
    logging.info(f'Pathway to reaction ID table: {pathwayMapping}')
    logging.info(f'Output file: {outputFile}')

    # Initialize evidence builder object
    evidenceBuilder = progenyEvidenceGenerator()

    # Writing evidence strings into a json file
    evidences = evidenceBuilder.generateEvidenceFromSource(inputFile, diseaseMapping, pathwayMapping, skipMapping)

    with gzip.open(outputFile, 'wt') as f:
        for evidence in evidences:
            json.dump(evidence, f)
            f.write('\n')

    logging.info(f'{len(evidences)} evidence strings saved into {outputFile}. Exiting.')


if __name__ == '__main__':

    # Initiating parser
    parser = argparse.ArgumentParser(description='This script generates evidences for the PROGENy data source.')

    parser.add_argument('-i', '--inputFile', required=True, type=str, help='Input source .tsv file.')
    parser.add_argument('-d', '--diseaseMapping', required=False, type=str, help='Input look-up table containing the cancer type mappings to an EFO ID.')
    parser.add_argument('-p', '--pathwayMapping', required=True, type=str, help='Input look-up table containing the pathway mappings to a respective target and ID in Reactome.')
    parser.add_argument('-o', '--outputFile', required=True, type=str, help='Gzipped JSON file containing the evidence strings.')
    parser.add_argument('-s', '--skipMapping', required=False, action='store_true', help='State whether to skip the disease to EFO mapping step.')
    parser.add_argument('-l', '--logFile', help='Destination of the logs generated by this script.', type=str, required=False)

    # Parsing parameters
    args = parser.parse_args()
    inputFile = args.inputFile
    diseaseMapping = args.diseaseMapping
    pathwayMapping = args.pathwayMapping
    outputFile = args.outputFile
    skipMapping = args.skipMapping

    # Initialize logging:
    logging.basicConfig(
        level=logging.INFO,
        format='%(name)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S',
    )
    if args.logFile:
        logging.config.fileConfig(filename=args.logFile)
    else:
        logging.StreamHandler(sys.stderr)

    main(inputFile, diseaseMapping, pathwayMapping, outputFile, skipMapping)
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import sys
import argparse
import gzip
import logging
import json

from pyspark.sql import SparkSession
import pyspark.sql.functions as F


class SLAPEnrichEvidenceGenerator:

    def __init__(self):
        # Create spark session
        self.spark = (
            SparkSession.builder
            .appName('SLAPEnrich')
            .getOrCreate()
        )
        logging.info(f'Spark version: {self.spark.version}')

        # Initialize source table
        self.dataframe = None

    def generateEvidenceFromSource(self, inputFile, diseaseMapping, skipMapping):
        '''
        Processing of the input file to build all the evidences from its data
        Returns:
            evidences (array): Object with all the generated evidences strings from source file
        '''
        # Read input file
        self.dataframe = (
            self.spark
            .read.csv(inputFile, sep=r'\t', header=True, inferSchema=True)
            .select('ctype', 'gene', 'pathway', 'SLAPEnrichPval')
            .withColumnRenamed('ctype', 'Cancer_type_acronym')
            .withColumnRenamed('SLAPEnrichPval', 'pval')
            .withColumn('pathwayId', F.split(F.col('pathway'), ': ').getItem(0))
            .withColumn('pathwayDescription', F.split(F.col('pathway'), ': ').getItem(1))
        )

        # Filter by p-value
        self.dataframe = self.dataframe.filter(F.col('pval') < 1e-4)

        # Mapping step
        if not skipMapping:
            try:
                self.dataframe = self.cancer2EFO(diseaseMapping)
                logging.info('Disease mappings have been imported.')
            except Exception as e:
                logging.error(f'An error occurred while importing disease mappings: \n{e}.')
        else:
            logging.info('Disease mapping has been skipped.')
            self.dataframe = self.dataframe.withColumn('EFO_id', F.lit(None))

        # Build evidence strings per row
        logging.info('Generating evidence:')
        evidences = (
            self.dataframe.rdd
            .map(SLAPEnrichEvidenceGenerator.parseEvidenceString)
            .collect()
        )  # list of dictionaries

        return evidences

    def cancer2EFO(self, diseaseMapping):
        diseaseMappingsFile = (
            self.spark
            .read.csv(diseaseMapping, sep=r'\t', header=True)
            .select('Cancer_type_acronym', 'EFO_id')
        )

        self.dataframe = self.dataframe.join(
            diseaseMappingsFile,
            on='Cancer_type_acronym',
            how='left'
        )

        return self.dataframe

    @staticmethod
    def parseEvidenceString(row):
        try:
            evidence = {
                'datasourceId': 'slapenrich',
                'datatypeId': 'affected_pathway',
                'resourceScore': row['pval'],
                'targetFromSourceId': row['gene'],
                'diseaseFromSource': row['Cancer_type_acronym'],
                'pathways': [
                    {
                        'id': row['pathwayId'],
                        'name': row['pathwayDescription']
                    }
                ]
            }

            # For unmapped diseases, we skip the mappedID key:
            if row['EFO_id']:
                evidence['diseaseFromSourceMappedId'] = row['EFO_id']

            return evidence
        except Exception as e:
            raise


def main(inputFile, diseaseMapping, outputFile, skipMapping):

    # Logging parameters
    logging.info(f'SLAPEnrich input table: {inputFile}')
    logging.info(f'Cancer type to EFO ID table: {diseaseMapping}')
    logging.info(f'Output file: {outputFile}')

    # Initialize evidence builder object
    evidenceBuilder = SLAPEnrichEvidenceGenerator()

    # Writing evidence strings into a json file
    evidences = evidenceBuilder.generateEvidenceFromSource(inputFile, diseaseMapping, skipMapping)

    with gzip.open(outputFile, 'wt') as f:
        for evidence in evidences:
            json.dump(evidence, f)
            f.write('\n')
    logging.info(f'{len(evidences)} evidence strings saved into {outputFile}. Exiting.')


if __name__ == '__main__':

    # Initiating parser
    parser = argparse.ArgumentParser(description='This script generates evidences for the SLAPEnrich data source.')

    parser.add_argument('-i', '--inputFile', required=True, type=str, help='Input source .tsv file.')
    parser.add_argument('-d', '--diseaseMapping', required=False, type=str,
                        help='Input look-up table containing the cancer type mappings to an EFO ID.')
    parser.add_argument('-o', '--outputFile', required=True, type=str,
                        help='Gzipped JSON file containing the evidence strings.')
    parser.add_argument('-s', '--skipMapping', required=False, action='store_true',
                        help='State whether to skip the disease to EFO mapping step.')
    parser.add_argument('-l', '--logFile', help='Destination of the logs generated by this script.',
                        type=str, required=False)

    # Parsing parameters
    args = parser.parse_args()
    inputFile = args.inputFile
    diseaseMapping = args.diseaseMapping
    outputFile = args.outputFile
    skipMapping = args.skipMapping

    # Initialize logging:
    logging.basicConfig(
        level=logging.INFO,
        format='%(name)s - %(levelname)s - %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S',
    )
    if args.logFile:
        logging.config.fileConfig(filename=args.logFile)
    else:
        logging.StreamHandler(sys.stderr)

    main(inputFile, diseaseMapping, outputFile, skipMapping)
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import math
import argparse
import logging
import sys

import pandas as pd


def renormalize(n, start_range, new_range=[0.5, 1]):
    """
    A function to scale a value from a given range to a new range.

    Apply the function f(x) to n using and old (start_range) and a new range
    where f(x) = (dNewRange / dOldRange * (n - old_range_lower_bound)) + new_lower
    """

    delta1 = start_range[1] - start_range[0]
    delta2 = new_range[1] - new_range[0]

    max_new_range = max(new_range)
    min_new_range = min(new_range)

    if delta1 or delta2:
        try:
            normalized = (delta2 * (n - start_range[0]) / delta1) + new_range[0]
        except ZeroDivisionError:
            normalized = new_range[0]
    else:
        normalized = n

    # The formula results in values slightly smaller and larger than the boundaries of the new range
    if normalized > max_new_range:
        return max_new_range

    elif normalized < min_new_range:
        return min_new_range

    return round(normalized, 4)


def generate_score(row):
    """
    Score generation depends on the score type.
    """
    score_type = row['score_type']
    score = row['score']
    min_score = row['min_score']
    max_score = row['max_score']

    if score_type == 'p-value':
        parsed_score = renormalize(math.log10(score), [math.log10(max_score), math.log10(min_score)])
    elif score_type == 'rank':
        parsed_score = renormalize(score, [min_score, max_score])
    else:
        parsed_score = 0.75

    return parsed_score


def main(studyFile, evidenceFile, out_file):

    logging.info(f'Output file: {out_file}')

    # Reading evidence:
    logging.info(f'Evidence file: {evidenceFile}')
    evidence_df = pd.read_csv(evidenceFile, sep='\t')
    logging.info(f'Number of evidence: {len(evidence_df)}')
    logging.info(f'Number of target: {len(evidence_df.target_id.unique())}')
    logging.info(f'Number of disease: {len(evidence_df.disease_id.unique())}')

    # Reading study file:
    logging.info(f'Study description file: {studyFile}')
    publication_df = pd.read_csv(studyFile, sep='\t')
    logging.info(f'Number of studies: {len(publication_df)}')

    # Merging publication with evidence data:
    merged = evidence_df.merge(publication_df.drop('pmid', axis=1), on='gene_set_name', how='outer', indicator=True)

    # Checking if merging worked just fine:
    if len(merged.loc[merged._merge != 'both']) != 0:
        logging.warning(f'{len(merged.loc[merged._merge != "both"])} rows could not be joined.')
        logging.warning(merged.loc[merged._merge != "both"])

    # Generate evidence:
    merged = (
        merged
        .assign(
            diseaseFromSourceMappedId=merged.disease_id.apply(lambda x: x.split('/')[-1]),
            datasourceId='sysbio',
            datatypeId='affected_pathway',
            literature=merged.pmid.apply(lambda x: [str(x)]),
            pathways=merged.gene_set_name.apply(lambda x: [{'name': x}]),
            resourceScore=merged.apply(generate_score, axis=1)
        )
        .rename(columns={
            'target_id': 'targetFromSourceId',
            'disease_name': 'diseaseFromSource',
            'method': 'studyOverview'
        })
        .drop(['_merge', 'max_score', 'min_score', 'score_type', 'score', 'disease_id', 'pmid', 'gene_set_name'], axis=1)
        .to_json(out_file, compression='gzip', orient='records', lines=True)
    )

    logging.info('Evidence generation finished.')


if __name__ == "__main__":

    # Parse CLI arguments
    parser = argparse.ArgumentParser(description='Generating target/disease evidence strings from SystemsBiology data.')
    parser.add_argument('-e', '--evidenceFile',
                        help='Name of tsv file with the per gene evidence file.',
                        type=str, required=True)
    parser.add_argument('-s', '--studyFile',
                        help='Name of tsv file with the study descriptions.',
                        type=str, required=True)
    parser.add_argument('-o', '--outputFile',
                        help='Name of gzip compressed json output file.',
                        type=str, required=True)
    parser.add_argument('-l', '--logFile',
                        help='Name of log file. If not provided logs are written to standard error.',
                        type=str, required=False)

    args = parser.parse_args()
    studyFile = args.studyFile
    evidenceFile = args.evidenceFile
    out_file = args.outputFile

    # If no logfile is specified, logs are written to stderr
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s',
        datefmt='%Y-%m-%d %H:%M:%S',
    )
    if args.logFile:
        logging.config.fileConfig(filename=args.logFile)
    else:
        logging.StreamHandler(sys.stderr)

    main(studyFile, evidenceFile, out_file)
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
import argparse
from functools import reduce
import logging
import sys
from typing import Optional

from pyspark import SparkFiles
from pyspark.sql.dataframe import DataFrame
import pyspark.sql.functions as F

from common.evidence import initialize_sparksession, write_evidence_strings


def main(
    toxcast: str,
    output: str,
    adverse_events: str,
    safety_risk: str,
    aopwiki: str,
    log_file: Optional[str] = None,
):
    """
    This module puts together data from different sources that describe target safety liabilities.

    Args:
        adverse_events: Input TSV containing adverse events associated with targets that have been collected from relevant publications. Fetched from GitHub.
        safety_risk: Input TSV containing cardiovascular safety liabilities associated with targets that have been collected from relevant publications. Fetched from GitHub.
        toxcast: Input table containing biological processes associated with relevant targets that have been observed in toxicity assays.
        output: Output gzipped json file following the target safety liabilities data model.
        log_file: Destination of the logs generated by this script. Defaults to None.
    """

    # Logger initializer. If no log_file is specified, logs are written to stderr
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
    if log_file:
        logging.config.fileConfig(filename=log_file)
    else:
        logging.StreamHandler(sys.stderr)

    # Initialize spark context
    global spark
    spark = initialize_sparksession()
    spark.sparkContext.addFile(adverse_events)
    spark.sparkContext.addFile(safety_risk)
    logging.info("Remote files successfully added to the Spark Context.")

    # Load and process the input files into dataframes
    ae_df = process_adverse_events(SparkFiles.get(adverse_events.split("/")[-1]))
    sr_df = process_safety_risk(SparkFiles.get(safety_risk.split("/")[-1]))
    toxcast_df = process_toxcast(toxcast)
    aopwiki_df = process_aop(aopwiki)
    logging.info("Data has been processed. Merging...")

    # Combine dfs and group evidence
    evidence_unique_cols = [
        "id",
        "targetFromSourceId",
        "event",
        "eventId",
        "datasource",
        "effects",
        "isHumanApplicable",
        "literature",
        "url",
    ]
    safety_dfs = [ae_df, sr_df, toxcast_df, aopwiki_df]
    safety_df = (
        reduce(
            lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), safety_dfs
        )
        # Collect biosample and study metadata by grouping on the unique evidence fields
        .groupBy(evidence_unique_cols)
        .agg(
            F.collect_set(F.col("biosample")).alias("biosamples"),
            F.collect_set(F.col("study")).alias("studies"),
        )
        .withColumn(
            "biosamples", F.when(F.size("biosamples") != 0, F.col("biosamples"))
        )
        .withColumn("studies", F.when(F.size("studies") != 0, F.col("studies")))
    )

    # Write output
    logging.info("Evidence strings have been processed. Saving...")
    write_evidence_strings(safety_df, output)
    logging.info(
        f"{safety_df.count()} evidence of safety liabilities have been saved to {output}. Exiting."
    )

    return 0


def process_aop(aopwiki: str) -> DataFrame:
    """Loads and processes the AOPWiki input JSON."""

    return (
        spark.read.json(aopwiki)
        # if isHumanApplicable is False, set it to null as the lack of applicability has not been tested - it only shows lack of data
        .withColumn(
            "isHumanApplicable",
            F.when(
                F.col("isHumanApplicable") != F.lit(True), F.col("isHumanApplicable")
            ),
        )
        # data bug: some events have the substring "NA" at the start - removal and trim the string
        .withColumn("event", F.trim(F.regexp_replace(F.col("event"), "^NA", "")))
        # data bug: effects.direction need to be in lowercase, this field is an enum
        .withColumn(
            "effects",
            F.transform(
                F.col("effects"),
                lambda x: F.struct(
                    F.when(
                        x.direction == "Activation",
                        F.lit("Activation/Increase/Upregulation"),
                    )
                    .when(
                        x.direction == "Inhibition",
                        F.lit("Inhibition/Decrease/Downregulation"),
                    )
                    .alias("direction"),
                    x.dosing.alias("dosing"),
                ),
            ),
        )
        # I need to convert the biosamples array into a struct so that data is parsed the same way as the rest of the sources
        .withColumn("biosample", F.explode_outer("biosamples"))
    )


def process_adverse_events(adverse_events: str) -> DataFrame:
    """
    Loads and processes the adverse events input TSV.

    Ex. input record:
        biologicalSystem | gastrointestinal
        effect           | activation_general
        efoId            | EFO_0009836
        ensemblId        | ENSG00000133019
        pmid             | 23197038
        ref              | Bowes et al. (2012)
        symptom          | bronchoconstriction
        target           | CHRM3
        uberonCode       | UBERON_0005409
        url              | null

    Ex. output record:
        id         | ENSG00000133019
        event      | bronchoconstriction
        datasource | Bowes et al. (2012)
        eventId    | EFO_0009836
        literature | 23197038
        url        | null
        biosample  | {gastrointestinal, UBERON_0005409, null, null, null}
        effects    | [{Activation/Increase/Upregulation, general}]
    """

    ae_df = (
        spark.read.csv(adverse_events, sep="\t", header=True)
        .select(
            F.col("ensemblId").alias("id"),
            F.col("symptom").alias("event"),
            F.col("efoId").alias("eventId"),
            F.col("ref").alias("datasource"),
            F.col("pmid").alias("literature"),
            "url",
            F.struct(
                F.col("biologicalSystem").alias("tissueLabel"),
                F.col("uberonCode").alias("tissueId"),
                F.lit(None).alias("cellLabel"),
                F.lit(None).alias("cellFormat"),
                F.lit(None).alias("cellId"),
            ).alias("biosample"),
            F.split(F.col("effect"), "_").alias("effects"),
        )
        .withColumn(
            "effects",
            F.struct(
                F.when(
                    F.col("effects")[0].contains("activation"),
                    F.lit("Activation/Increase/Upregulation"),
                )
                .when(
                    F.col("effects")[0].contains("inhibition"),
                    F.lit("Inhibition/Decrease/Downregulation"),
                )
                .alias("direction"),
                F.element_at(F.col("effects"), 2).alias("dosing"),
            ),
        )
    )

    # Multiple dosing effects need to be grouped in the same record.
    effects_df = ae_df.groupBy("id", "event", "datasource").agg(
        F.collect_set(F.col("effects")).alias("effects")
    )
    ae_df = ae_df.drop("effects").join(
        effects_df, on=["id", "event", "datasource"], how="left"
    )

    return ae_df


def process_safety_risk(safety_risk: str) -> DataFrame:
    """
    Loads and processes the safety risk information input TSV.

    Ex. input record:
        biologicalSystem | cardiovascular sy...
        ensemblId        | ENSG00000132155
        event            | heart disease
        eventId          | EFO_0003777
        liability        | Important for the...
        pmid             | 21283106
        ref              | Force et al. (2011)
        target           | RAF1
        uberonId         | UBERON_0004535

    Ex. output record:
        id         | ENSG00000132155
        event      | heart disease
        eventId    | EFO_0003777
        literature | 21283106
        datasource | Force et al. (2011)
        biosample  | {cardiovascular s...
        study      | {Important for th...
    """

    return (
        spark.read.csv(safety_risk, sep="\t", header=True)
        .select(
            F.col("ensemblId").alias("id"),
            "event",
            "eventId",
            F.col("pmid").alias("literature"),
            F.col("ref").alias("datasource"),
            F.struct(
                F.col("biologicalSystem").alias("tissueLabel"),
                F.col("uberonId").alias("tissueId"),
                F.lit(None).alias("cellLabel"),
                F.lit(None).alias("cellFormat"),
                F.lit(None).alias("cellId"),
            ).alias("biosample"),
            F.struct(
                F.col("liability").alias("description"),
                F.lit(None).alias("name"),
                F.lit(None).alias("type"),
            ).alias("study"),
        )
        .withColumn(
            "event",
            F.when(F.col("datasource").contains("Force"), "heart disease").when(
                F.col("datasource").contains("Lamore"), "cardiac arrhythmia"
            ),
        )
        .withColumn(
            "eventId",
            F.when(F.col("datasource").contains("Force"), "EFO_0003777").when(
                F.col("datasource").contains("Lamore"), "EFO_0004269"
            ),
        )
    )


def process_toxcast(toxcast: str) -> DataFrame:
    """
    Loads and processes the ToxCast input table.

    Ex. input record:
        assay_component_endpoint_name | ACEA_ER_80hr
        assay_component_desc          | ACEA_ER_80hr, is ...
        biological_process_target     | cell proliferation
        tissue                        | null
        cell_format                   | cell line
        cell_short_name               | T47D
        assay_format_type             | cell-based
        official_symbol               | ESR1
        eventId                       | null

    Ex. output record:
    targetFromSourceId | ESR1
    event              | cell proliferation
    eventId            | null
    biosample          | {null, null, T47D...
    datasource         | ToxCast
    url                | https://www.epa.g...
    study              | {ACEA_ER_80hr, AC...
    """

    return spark.read.csv(toxcast, sep="\t", header=True).select(
        F.trim(F.col("official_symbol")).alias("targetFromSourceId"),
        F.col("biological_process_target").alias("event"),
        "eventId",
        F.struct(
            F.col("tissue").alias("tissueLabel"),
            F.lit(None).alias("tissueId"),
            F.col("cell_short_name").alias("cellLabel"),
            F.col("cell_format").alias("cellFormat"),
            F.lit(None).alias("cellId"),
        ).alias("biosample"),
        F.lit("ToxCast").alias("datasource"),
        F.lit(
            "https://www.epa.gov/chemical-research/exploring-toxcast-data-downloadable-data"
        ).alias("url"),
        F.struct(
            F.col("assay_component_endpoint_name").alias("name"),
            F.col("assay_component_desc").alias("description"),
            F.col("assay_format_type").alias("type"),
        ).alias("study"),
    )


def get_parser():
    """Get parser object for script TargetSafety.py."""
    parser = argparse.ArgumentParser(description=__doc__)

    parser.add_argument(
        "--toxcast",
        help="Input table containing biological processes associated with relevant targets that have been observed in toxicity assays.",
        type=str,
        required=True,
    )
    parser.add_argument(
        "--adverse_events",
        help="Input TSV containing adverse events associated with targets that have been collected from relevant publications. Fetched from https://raw.githubusercontent.com/opentargets/curation/master/target_safety/adverse_effects.tsv.",
        type=str,
        required=True,
    )
    parser.add_argument(
        "--safety_risk",
        help="Input TSV containing cardiovascular safety liabilities associated with targets that have been collected from relevant publications. Fetched from https://raw.githubusercontent.com/opentargets/curation/master/target_safety/safety_risks.tsv.",
        type=str,
        required=True,
    )
    parser.add_argument(
        "--aopwiki",
        help="Input JSON containing targets implicated in adverse outcomes as reported by the AOPWiki. Parsed from their source XML data.",
        type=str,
        required=True,
    )
    parser.add_argument(
        "--output",
        help="Output gzipped json file following the target safety liabilities data model.",
        type=str,
        required=True,
    )
    parser.add_argument(
        "--log_file",
        help="Destination of the logs generated by this script. Defaults to None",
        type=str,
        nargs="?",
        default=None,
    )

    return parser


if __name__ == "__main__":
    args = get_parser().parse_args()

    main(
        toxcast=args.toxcast,
        adverse_events=args.adverse_events,
        safety_risk=args.safety_risk,
        aopwiki=args.aopwiki,
        output=args.output,
    )
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import argparse
import logging
import sys

from pyspark.sql import functions as f
from pyspark import SparkFiles

from common.evidence import (
    write_evidence_strings,
    initialize_sparksession,
)

# The TEP dataset is made available by the SGC as a tab-separated file:
TEPURL = "https://www.thesgc.org/sites/default/files/fileuploads/available-teps.tsv"


def main(outputFile: str) -> None:

    # Initialize spark session
    spark = initialize_sparksession()
    spark.sparkContext.addFile(TEPURL)

    # Fetching and processing the TEP table and saved as a JSON file:
    TEP_df = (
        spark.read.csv(SparkFiles.get(TEPURL.split("/")[-1]), sep="\t", header=True)
        # Generating TEP url from Gene column: SLC12A4/SLC12A6 -> https://www.thesgc.org/tep/SLC12A4SLC12A6
        .withColumn(
            "url",
            f.concat(
                f.lit("https://www.thesgc.org/tep/"),
                f.regexp_replace(f.lower(f.col("Gene")), "/", ""),
            ),
        )
        # Exploding TEPs, where multiple genes are given:
        .withColumn("targetFromSourceId", f.explode(f.split(f.col("Gene"), "/")))
        # Renaming columns:
        .withColumnRenamed("Therapeutic Area", "therapeuticArea")
        .withColumnRenamed("Description", "description")
        # Dropping columns:
        .drop(*["Gene", "version", "Date"])
        .persist()
    )

    logging.info("TEP dataset has been downloaded and formatted.")
    logging.info(f"Number of TEPs: {TEP_df.count()}")
    logging.info(
        f'Number of unique genes: {TEP_df.select("targetFromSourceId").distinct().count()}'
    )

    # Saving data:
    write_evidence_strings(TEP_df, outputFile)
    logging.info(f"TEP dataset is written to {outputFile}.")


if __name__ == "__main__":

    # Reading output file name from the command line:
    parser = argparse.ArgumentParser(
        description="This script fetches TEP data from Structural Genomics Consortium."
    )
    parser.add_argument(
        "--output_file", "-o", type=str, help="Output file. gzipped JSON", required=True
    )
    parser.add_argument(
        "--log_file",
        type=str,
        help="File into which the logs are saved",
        required=False,
    )
    args = parser.parse_args()

    # If no logfile is specified, logs are written to the standard error:
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
    if args.log_file:
        logging.config.fileConfig(filename=args.log_file)
    else:
        logging.StreamHandler(sys.stderr)

    main(args.output_file)
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
import argparse
import logging
import os
import sys
from functools import reduce

from scipy import stats as st

from typing import Dict, List, Any

from pyspark.sql import SparkSession, functions as f, types as t
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.window import Window

from common.evidence import (
    initialize_sparksession,
    write_evidence_strings,
    GenerateDiseaseCellLines,
    read_ppp_config,
)


class EncoreEvidenceGenerator:
    """This parser generates disease/target evidence based on datafiles submitted by the Encore team.

    Input for the parser:
        - JSON configuration describing the experiments.

    For each experiment the following input files are expected:
        - lfc_file: log fold change values for every gene pairs, for every experiment.
        - bliss_file: the analysed cooperative effect for each gene pairs calculated by the BLISS analysis.
        - gemini_file: the analysed cooperative effect for each gene pairs calculated by the Gemini analysis.

    The parser joins the lfc data with the cooperativity measurements by gene pairs and cell line.
    Apply filter on gene pairs:
        - only extract gene pairs with significant log fold change.
        - only extract gene pairs with significant cooperative effect.
    """

    def __init__(
        self, spark: SparkSession, cell_passport_df: DataFrame, shared_parameters: dict
    ) -> None:
        self.spark = spark
        self.cell_passport_df = cell_passport_df

        # Parsing paramters:
        self.log_fold_change_cutoff_p_val = shared_parameters["logFoldChangeCutoffPVal"]
        self.log_fold_change_cutoff_FDR = shared_parameters["logFoldChangeCutoffFDR"]
        self.interaction_cutoff_p_val = shared_parameters["blissCutoffPVal"]
        self.data_folder = shared_parameters["data_folder"]
        self.release_version = shared_parameters["releaseVersion"]
        self.release_date = shared_parameters["releaseDate"]

    @staticmethod
    @f.udf(
        t.ArrayType(
            t.StructType(
                [
                    t.StructField("targetFromSourceId", t.StringType(), False),
                    t.StructField("targetRole", t.StringType(), False),
                    t.StructField(
                        "interactingTargetFromSourceId", t.StringType(), False
                    ),
                    t.StructField("interactingTargetRole", t.StringType(), False),
                ]
            )
        )
    )
    def parse_targets(gene_pair: str, gene_role: str) -> List[Dict[Any, Any]]:
        """The gene pair string is split and assigned to the relevant role + exploding into evidence of two targets.

        gene pair: 'SHC1~ADAD1', where 'SHC1' is the library gene, and 'ADAD1' is the anchor gene.

        Both genes will be targetFromSource AND interactingTargetFromSource, while keeping their roles.
        """
        genes = gene_pair.split("~")
        roles = [gene_role.replace("Combinations", "").lower(), "anchor"]

        assert len(genes) == 2
        parsed = []

        for i, (gene, role) in enumerate(zip(genes, roles)):
            parsed.append(
                {
                    "targetFromSourceId": gene,
                    "targetRole": role,
                    "interactingTargetFromSourceId": genes[1] if i == 0 else genes[0],
                    "interactingTargetRole": roles[1] if i == 0 else roles[0],
                }
            )

        return parsed

    def get_lfc_data(self, lfc_file: str) -> DataFrame:
        """
        The table has results from CRISPR/Cas9 experiments: the p-value, false discovery rate (fdr)
        and the log fold-change file describes the observed survival of the cells.

        Each row is a gene pair, columns contain results from the measurements. Column names contain
        information on the tested cell line (SIDM) and ocassionally the replicate (CSID) identifiers.

        This really wide table is stacked to get only three numeric columns (p-value, fold-change and FDR),
        plus string column with cell line.

        Args:
          lfc_file (str): str

        Returns:
          A dataframe with the following columns:
            id
            cellLineName
            Note1
            Note2
            phenotypicConsequenceLogFoldChange
            phenotypicConsequencePValue
            phenotypicConsequenceFDR
        """

        # Fixed statistical field names:
        stats_fields = ["p-value", "fdr", "lfc"]

        # Reading the data into a single dataframe:
        lfc_df = self.spark.read.csv(lfc_file, sep="\t", header=True)

        # Collect the cell lines from the lfc file header:
        cell_lines = set(["_".join(x.split("_")[:-1]) for x in lfc_df.columns[4:]])

        # Generating struct for each cell lines:
        # SIDM are Sanger model identifiers, while CSID are replicate identifiers.
        # SIDM00049_CSID1053_p-value
        # SIDM00049_CSID1053_fdr
        # SIDM00049_CSID1053_lfc
        # Into: SIDM00049_CSID1053: struct(p-value, fdr, lfc)
        expressions = map(
            lambda cell: (
                cell,
                f.struct(
                    [
                        f.col(f"{cell}_{x}").cast(t.FloatType()).alias(x)
                        for x in stats_fields
                    ]
                ),
            ),
            cell_lines,
        )

        # Applying map on the dataframe:
        res_df = reduce(lambda DF, value: DF.withColumn(*value), expressions, lfc_df)

        # Stack the previously generated columns:
        unpivot_expression = f"""stack({len(cell_lines)}, {", ".join([f"'{x}', {x}" for x in cell_lines])} ) as (cellLineName, cellLineData)"""

        return (
            res_df
            # Unpivot:
            .select("id", "Note1", "Note2", f.expr(unpivot_expression))
            # Extracting and renaming log-fold-change parameters:
            .select(
                "id",
                f.split(f.col("cellLineName"), "_").getItem(0).alias("cellLineName"),
                "Note1",
                "Note2",
                f.col("cellLineData.lfc").alias("phenotypicConsequenceLogFoldChange"),
                f.col("cellLineData.p-value").alias("phenotypicConsequencePValue"),
                f.col("cellLineData.fdr").alias("phenotypicConsequenceFDR"),
            )
        )

    def get_bliss_data(self, bliss_file: str) -> DataFrame:
        """
        It reads a bliss file, extracts the cell lines from the header, generates a struct for each cell
        line, unpivots the data, and finally extracts the bliss score and p-value

        Args:
          bliss_file (str): The path to the bliss file.

        Returns:
          A dataframe with the following columns:
            - id
            - cellLineName
            - geneticInteractionScore
            - geneticInteractionPValue
            - statisticalMethod
        """

        stats_fields = [
            "gene1",
            "gene2",
            "observed",
            "observed_expected",
            "pval",
            "zscore",
        ]
        # Read bliss file:
        bliss_df = self.spark.read.csv(bliss_file, sep="\t", header=True)

        # Collect cell-line/recplicate pairs from the headers:
        cell_lines = set(
            [
                "_".join(x.split("_")[0:2])
                for x in bliss_df.columns[4:]
                if x.startswith("SID")
            ]
        )

        # Generating struct for each cell lines:
        expressions = map(
            lambda cell: (
                cell,
                f.struct([f.col(f"{cell}_{x}").alias(x) for x in stats_fields]),
            ),
            cell_lines,
        )

        # Applying map on the dataframe:
        res_df = reduce(lambda DF, value: DF.withColumn(*value), expressions, bliss_df)

        # Stack the previously generated columns:
        unpivot_expression = f"""stack({len(cell_lines)}, {", ".join([f"'{x}', {x}" for x in cell_lines])} ) as (cellLineName, cellLineData)"""

        # Windowing over the cell-line/gene-pair pairs:
        w = Window.partitionBy(["id", "cellLineName"]).orderBy(
            f.asc("geneticInteractionPValue")
        )

        return (
            res_df.select(
                # Create a consistent id column:
                f.regexp_replace(f.col("Gene_Pair"), ";", "~").alias("id"),
                # Unpivot:
                f.expr(unpivot_expression),
            )
            # Extracting and renaming bliss statistical values:
            .select(
                "id",
                f.split(f.col("cellLineName"), "_")[0].alias("cellLineName"),
                f.split(f.col("cellLineName"), "_")[1].alias("experimentId"),
                f.col("cellLineData.zscore").cast(t.FloatType()).alias("zscore"),
            )
            .filter(f.col("zscore") != "NaN")
            # BLISS data is not aggregated for cell lines, instead we have data for each replicates.
            # To allow averaging, data needs to be grouped by gene pair and cell line:
            .groupby("id", "cellLineName")
            # Averaging z-scores:
            .agg(
                f.sum(f.col("zscore")).alias("sumzscore"),
                f.count(f.col("zscore")).alias("experiment_count"),
            )
            .withColumn(
                "geneticInteractionScore",
                f.col("sumzscore") / f.sqrt(f.col("experiment_count")),
            )
            # Calculating p-value for the averaged z-score:
            .withColumn(
                "geneticInteractionPValue",
                f.udf(
                    lambda zscore: float(st.norm.sf(abs(zscore)) * 2)
                    if zscore
                    else None,
                    t.FloatType(),
                )(f.col("geneticInteractionScore")),
            )
            .select(
                "id",
                "cellLineName",
                f.lit("bliss").alias("statisticalMethod"),
                "geneticInteractionPValue",
                "geneticInteractionScore",
                # Based on the z-score we can tell the nature of the interaction:
                # - positive z-score means synergictic
                # - negative z-score means antagonistics interaction
                f.when(f.col("geneticInteractionScore") > 0, "synergistic")
                .otherwise("antagonistic")
                .alias("geneInteractionType"),
            )
        )

    def get_gemini_data(self, gemini_file: str) -> DataFrame:
        """Parsing cooperative effects calculated by gemini method

        The structure of the file similar to the lfc file, the process follows a similar fashion,
        except it slightly different. Different enough to get a separate function.
        """

        # Fixed statistical field names:
        stats_fields = ["score", "pval", "FDR"]

        # Reading the data into a single dataframe:
        gemini_df = self.spark.read.csv(gemini_file, sep="\t", header=True)

        # Collect the cell lines from the lfc file header:
        cell_lines = set(
            [
                "_".join(x.split("_")[:-1])
                for x in gemini_df.columns[4:]
                if x.startswith("SID")
            ]
        )

        # There are some problems in joining gemini files on Encore side. It causes a serious issues:
        # 1. Multiple Gene_Pair columns in the file -> these will be indexed in the pyspark dataframe
        # 2. Some columns for some cell lines will be missing eg. pvalue for SIDM00049_CSID1053
        #
        # To mitigate these issue we have to check for gene pair header and remove cell lines with incomplete data.
        if "Gene_Pair" in gemini_df.columns:
            gene_column = "Gene_Pair"
        elif "Gene_Pair0" in gemini_df.columns:
            gene_column = "Gene_Pair0"
        else:
            raise ValueError(
                f'No \'Gene_Pair\' column in Gemini data: {",".join(gemini_df.columns)}'
            )

        # We check if all stats columns available for all cell lines (this coming from a data joing bug at encore):
        missing_columns = [
            f"{cell}_{stat}"
            for cell in cell_lines
            for stat in stats_fields
            if f"{cell}_{stat}" not in gemini_df.columns
        ]
        cells_to_drop = set(["_".join(x.split("_")[:-1]) for x in missing_columns])

        # If there are missingness, the relevant cell lines needs to be removed from the analysis:
        if missing_columns:
            logging.warn(f'Missing columns: {", ".join(missing_columns)}')
            logging.warn(f'Dropping cell_lines: {", ".join(cells_to_drop)}')

            # Removing missing cell lines:
            cell_lines = [x for x in cell_lines if x not in cells_to_drop]

        # Generating struct for each cell lines:
        expressions = map(
            lambda cell: (
                cell,
                f.struct([f.col(f"{cell}_{x}").alias(x) for x in stats_fields]),
            ),
            cell_lines,
        )

        # Applying map on the dataframe:
        res_df = reduce(lambda DF, value: DF.withColumn(*value), expressions, gemini_df)

        # Stack the previously generated columns:
        unpivot_expression = f"""stack({len(cell_lines)}, {", ".join([f"'{x}', {x}" for x in cell_lines])} ) as (cellLineName, cellLineData)"""

        return (
            res_df
            # Create a consistent id column:
            .withColumn("id", f.regexp_replace(f.col(gene_column), ";", "~"))
            # Unpivot:
            .select("id", f.expr(unpivot_expression))
            # Extracting and renaming gemini statistical values:
            .select(
                "id",
                f.regexp_replace("cellLineName", "_strong", "").alias("cellLineName"),
                f.col("cellLineData.score")
                .cast(t.FloatType())
                .alias("geneticInteractionScore"),
                f.col("cellLineData.pval")
                .cast(t.FloatType())
                .alias("geneticInteractionPValue"),
                f.col("cellLineData.FDR")
                .cast(t.FloatType())
                .alias("geneticInteractionFDR"),
                f.lit("gemini").alias("statisticalMethod"),
            )
        )

    def parse_experiment(self, parameters: dict) -> DataFrame:
        """Parsing experiments based on the experimental descriptions.

        Args:
            parameters: Dictionary of experimental parameters. The following keys are required:
                - dataset: Name of the dataset eg. COLO1- referring to the first libraryset of colo.
                - diseaseFromSource: Name of the disease model of the experiment.
                - diseaseFromSourceMappedId: EFO ID of the disease model of the experiment.
                - logFoldChangeFile: File path to the log fold change file.
                - geminiFile: File path to the gemini file.
                - blissFile: File path to the bliss file.

        Returns:
            A pyspark dataframe of experiment data.

        Process:
            - Reading all files.
            - Joining files.
            - Applying filters.
                - Apply filter on lf change
                - Apply filter on cooperativity
                - Apply filter on target role (keeping only library genes, controls are dropped)
            - Adding additional columns + finalizing evidence model
        """

        disease_from_source = parameters["diseaseFromSource"]
        disease_from_source_mapped_id = parameters["diseaseFromSourceMappedId"]
        dataset = parameters["dataset"]
        log_fold_change_file = parameters["logFoldChangeFile"]
        # gemini_file = parameters["geminiFile"] # <= Removed as genini method is dropped for now.
        blissFile = parameters["blissFile"]

        # Testing if experiment needs to be skipped:
        if parameters["skipStudy"] is True:
            logging.info(f"Skipping study: {dataset}")
            return None

        logging.info(f"Parsing experiment: {dataset}")

        # if no log fold change file is provided, we will not generate any evidence.
        if log_fold_change_file is None:
            logging.warning(f"No log fold change file provided for {dataset}.")
            return None

        # Reading lfc data:
        lfc_file = f"{self.data_folder}/{log_fold_change_file}"
        lfc_df = self.get_lfc_data(lfc_file)

        logging.info(
            f'Number of gene pairs in the log(fold change) dataset: {lfc_df.select("id").distinct().count()}'
        )
        logging.info(
            f'Number cell lines in the log(fold change) dataset: {lfc_df.select("cellLineName").distinct().count()}'
        )
        # A decistion was made to exclude gemini asessments, for now.
        # # Reading gemini data:
        # gemini_file = f"{self.data_folder}/{gemini_file}"
        # gemini_df = self.get_gemini_data(gemini_file)
        # logging.info(
        #     f'Number of gene pairs in the gemini dataset: {gemini_df.select("id").distinct().count()}'
        # )
        # logging.info(
        #     f'Number cell lines in the gemini dataset: {gemini_df.select("cellLineName").distinct().count()}'
        # )

        bliss_file = f"{self.data_folder}/{blissFile}"
        bliss_df = self.get_bliss_data(bliss_file)

        # Merging lfc + gemini:
        merged_dataset = (
            lfc_df
            # Data is joined by the gene-pair and cell line:
            .join(bliss_df, how="inner", on=["id", "cellLineName"])
            # Applying filters on logFoldChange + interaction p-value thresholds:
            .filter(
                (
                    f.col("phenotypicConsequencePValue")
                    <= self.log_fold_change_cutoff_p_val
                )
                & (f.col("phenotypicConsequenceFDR") <= self.log_fold_change_cutoff_FDR)
                & (f.col("geneticInteractionPValue") <= self.interaction_cutoff_p_val)
            )
            # Cleaning the cell line annotation:
            .withColumn("cellId", f.split(f.col("cellLineName"), "_").getItem(0))
            # Joining with cell passport data containing diseaseCellLine and biomarkers info:
            .join(
                self.cell_passport_df.select(
                    f.col("id").alias("cellId"), "diseaseCellLine", "biomarkerList"
                ),
                on="cellId",
                how="left",
            )
        )
        logging.info(
            f'Number of gene pairs in the merged dataset: {merged_dataset.select("id").count()}'
        )
        logging.info(
            f'Number of cell lines in the merged dataset: {merged_dataset.select("cellLineName").distinct().count()}'
        )

        evidence_df = (
            merged_dataset
            # Parsing and exploding gene names and target roles:
            .withColumn("id", self.parse_targets(f.col("id"), f.col("Note1")))
            .select("*", f.explode(f.col("id")).alias("genes"))
            .select(
                # Adding target releated fields:
                f.col("genes.*"),
                # Adding cell line specific annotation:
                f.array(f.col("diseaseCellLine")).alias("diseaseCellLines"),
                "biomarkerList",
                # Adding evidence specific stats:
                "phenotypicConsequenceLogFoldChange",
                "phenotypicConsequencePValue",
                "phenotypicConsequenceFDR",
                "statisticalMethod",
                "geneticInteractionPValue",
                "geneticInteractionScore",
                "geneInteractionType",
                # Adding disease information:
                f.lit(disease_from_source_mapped_id).alias("diseaseFromSourceMappedId"),
                f.lit(disease_from_source).alias("diseaseFromSource"),
                # Adding release releated fields:
                f.lit(self.release_version).alias("releaseVersion"),
                f.lit(self.release_date).alias("releaseDate"),
                # Data source specific fields:
                f.lit("ot_partner").alias("datatypeId"),
                f.lit("encore").alias("datasourceId"),
                f.lit("OTAR2062").alias("projectId"),
                f.lit("Encore project").alias("projectDescription"),
            )
            # Dropping all evidence for anchor and control genes:
            .filter(f.col("targetRole") == "library")
            .distinct()
        )

        # If there's a warning message for the sudy, add that:
        if "warningMessage" in parameters and parameters["warningMessage"] is not None:
            evidence_df = evidence_df.withColumn(
                "warningMessage", f.lit(parameters["warningMessage"])
            )

        return evidence_df


def main(outputFile: str, parameters: dict, cellPassportFile: str) -> None:

    # Initialize spark session
    spark = initialize_sparksession()

    # Opening and parsing the cell passport data from Sanger:
    cell_passport_df = GenerateDiseaseCellLines(cellPassportFile, spark).get_mapping()

    logging.info(f"Cell passport dataframe has {cell_passport_df.count()} rows.")
    logging.info("Parsing experiment data...")

    # Initialising evidence generator:
    evidenceGenerator = EncoreEvidenceGenerator(
        spark, cell_passport_df, parameters["sharedMetadata"]
    )

    # Create evidence for all experiments. Dataframes are collected in a list:
    evidence_dfs = []
    for experiment in parameters["experiments"]:
        evidence_dfs.append(evidenceGenerator.parse_experiment(experiment))

    # Filter out None values, so only dataframes with evidence are kept:
    evidence_dfs = list(filter(None, evidence_dfs))

    # combine all evidence dataframes into one:
    combined_evidence_df = reduce(lambda df1, df2: df1.union(df2), evidence_dfs)

    # The combined evidence is written to a file:
    write_evidence_strings(combined_evidence_df, outputFile)


if __name__ == "__main__":

    # Reading output file name from the command line:
    parser = argparse.ArgumentParser(
        description="This script parses ENCORE data and generates disease-target evidence."
    )
    parser.add_argument(
        "--output_file", "-o", type=str, help="Output file. gzipped JSON", required=True
    )
    parser.add_argument(
        "--parameter_file",
        "-p",
        type=str,
        help="A JSON file describing exeriment metadata",
        required=True,
    )
    parser.add_argument(
        "--data_folder",
        "-d",
        type=str,
        help="A folder in which the data files are located",
        required=True,
    )
    parser.add_argument(
        "--log_file",
        type=str,
        help="File into which the logs are saved",
        required=False,
    )
    parser.add_argument(
        "--cell_passport_file",
        "-c",
        type=str,
        help="File containing cell passport data",
        required=True,
    )
    args = parser.parse_args()

    # If no logfile is specified, logs are written to the standard error:
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
    if args.log_file:
        logging.config.fileConfig(filename=args.log_file)
    else:
        logging.StreamHandler(sys.stderr)

    # Validating data folder:
    if not os.path.isdir(args.data_folder):
        logging.error(f"Data folder {args.data_folder} does not exist.")
        raise ValueError(f"Data folder {args.data_folder} does not exist.")

    # Reading parameter json:
    parameters = read_ppp_config(args.parameter_file)

    # Updating parameters:
    parameters["sharedMetadata"]["data_folder"] = args.data_folder

    # Passing all the required arguments:
    main(args.output_file, parameters, args.cell_passport_file)
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
import argparse
from datetime import datetime
import gzip
import json
import logging
import sys
import pandas as pd

# The statisticalTestTail is inferred by the column name which is being filtered on:
FILTER_COLUMN_MAP = {
    "pos|fdr": "upper tail",
    "neg|fdr": "lower tail",
    "neg|p-value": "lower tail",
    "pos|p-value": "upper tail",
}


class OTAR_CRISPR_study_parser(object):
    def __init__(self, study_url: str) -> None:
        self.study_file = study_url

        # Store the study dataframe after dropping problematic studies:
        study_df = (
            pd.read_json(study_url)
            # drop rows with no study id or data file:
            .loc[lambda df: df.studyId.notna() & df.dataFiles.notna()]
        )

        # Test and warn if multiple studies have the same study id:
        duplicated_study_ids = study_df.loc[
            lambda df: df.studyId.duplicated()
        ].studyId.tolist()

        assert (
            len(duplicated_study_ids) == 0
        ), f'Multiple studies have the same study id: {", ".join(duplicated_study_ids)}'

        # Splitting studies if
        study_df = (
            study_df
            # Exploding filter columns:
            .assign(filterColumn=lambda df: df.filterColumn.str.split(",")).explode(
                "filterColumn"
            )
        )

        logging.info(f"Number of studies processed: {len(study_df.studyId.unique())}")

        projects = study_df.projectId.unique()
        logging.info(f'Number of projects: {len(projects)} ({", ".join(projects)})')

        self.study_df = study_df

    def generate_evidence(self, data_folder: str) -> None:
        # Looping through the studies and generating evidence:
        # Reading all data files and filter for significant hits:
        study_columns = [
            "releaseDate",
            "releaseVersion",
            "studyId",
            "dataFiles",
            "dataFileType",
            "filterColumn",
            "threshold",
            "projectId",
            "ControlDataset",
            "projectDescription",
        ]

        # hits is a pd.Series with pd.DataFrames as values.
        hits = (
            self.study_df[study_columns]
            .explode("dataFiles")
            .assign(
                dataFile=lambda df: df.apply(
                    lambda x: f'{data_folder}/{x["projectId"]}/{x["dataFiles"]}', axis=1
                )
            )
            .assign(
                ControlDataset=lambda df: df.apply(
                    lambda x: f'{data_folder}/{x["projectId"]}/{x["ControlDataset"]}'
                    if pd.notna(x["ControlDataset"])
                    else None,
                    axis=1,
                )
            )
            # TODO: parsing the data files should be file type dependent!
            # The following apply returns pd.DataFrames:
            .apply(self.parse_MAGeCK_file, axis=1)
        )

        # Concatenate all hits into one single dataframe:
        hits_df = (
            pd.concat(hits.to_list()).reset_index(drop=True)
            # Cleaning gene id column:
            .assign(
                targetFromSourceId=lambda df: df.targetFromSourceId.apply(
                    self.cleaning_gene_id
                )
            )
        )

        # Merging:
        evidence_fields = [
            "targetFromSourceId",
            "diseaseFromSourceMappedId",
            "projectDescription",
            "projectId",
            "studyId",
            "studyOverview",
            "contrast",
            "crisprScreenLibrary",
            "cellType",
            "cellLineBackground",
            "geneticBackground",
            "statisticalTestTail",
            "resourceScore",
            "log2FoldChangeValue",
            "releaseDate",
            "releaseVersion",
        ]
        self.merged_dataset = (
            self.study_df.assign(
                statisticalTestTail=lambda df: df.filterColumn.map(FILTER_COLUMN_MAP)
            )
            .merge(hits_df, on=["studyId", "filterColumn"], how="inner")
            .explode("diseaseFromSourceMappedId")
            .filter(items=evidence_fields)
            .assign(datasourceId="ot_crispr", datatypeId="ot_partner")
        )

    @staticmethod
    def cleaning_gene_id(gene_id: str) -> str:
        """Expandable set of string processing steps to clean gene identifiers.

        Examples:
            >>> cleaning_gene_id("ENSG00000187123_LYPD6")
            >>> "ENSG00000187123"
        """

        # ENSG00000187123_LYPD6 -> ENSG00000187123
        gene_id = gene_id.split("_")[0]

        return gene_id

    @staticmethod
    def parse_MAGeCK_file(row: pd.Series) -> pd.DataFrame:
        """This function returns a pandas dataframe with the datafile and with properly named columns"""

        datafile = row["dataFile"]
        filterColumn = row["filterColumn"]
        threshold = float(row["threshold"])
        studyId = row["studyId"]
        controlDataFile = row["ControlDataset"]

        # Which end of the distribution are we looking? - "neg" or "pos"?
        side = filterColumn.split("|")[0]

        # Read data, filter and rename columns:
        mageck_df = (
            pd.read_csv(datafile, sep="\t")
            .rename(
                columns={
                    filterColumn: "resourceScore",
                    "id": "targetFromSourceId",
                    # Extracting log fold change for the relevant direction:
                    f"{side}|lfc": "log2FoldChangeValue",
                }
            )
            .loc[lambda df: df.resourceScore <= threshold][
                ["targetFromSourceId", "resourceScore", "log2FoldChangeValue"]
            ]
            .assign(studyId=studyId, filterColumn=filterColumn)
        )

        # Applying control if present:
        if pd.isna(controlDataFile):
            logging.info(f"Number of genes reach threshold: {len(mageck_df)}")
            return mageck_df

        # Read control data, filter and rename columns:
        logging.info(f"Reading control data file: {controlDataFile}")
        controlHits = (
            pd.read_csv(controlDataFile, sep="\t")
            .rename(columns={filterColumn: "resourceScore", "id": "targetFromSourceId"})
            .loc[lambda df: df.resourceScore <= threshold]["targetFromSourceId"]
            .tolist()
        )

        # Excluding control genes:
        mageck_df = mageck_df.loc[lambda df: df.targetFromSourceId.isin(controlHits)]

        logging.info(f"Number of genes reach threshold: {len(mageck_df)}")
        return mageck_df

    def write_evidence(self, output_file: str) -> None:
        """Write the merged evidence to file"""

        json_list = [
            json.dumps(row.dropna().to_dict())
            for _, row in self.merged_dataset.iterrows()
        ]
        with gzip.open(output_file, "wt") as f:
            f.write("\n".join(json_list) + "\n")


def main(study_table, output_file, data_folder) -> None:
    # Parsing study table:
    parser = OTAR_CRISPR_study_parser(study_table)

    # Read data:
    parser.generate_evidence(data_folder)

    # Save data:
    parser.write_evidence(output_file)


if __name__ == "__main__":
    # Parsing arguments:
    parser = argparse.ArgumentParser(
        description="Script to parse internally generated datasets for OTAR projects."
    )

    parser.add_argument(
        "-s",
        "--study_table",
        type=str,
        required=True,
        help="A JSON file with study level metadata.",
    )
    parser.add_argument(
        "-o",
        "--output",
        required=False,
        type=str,
        help="Output json.gz file with the evidece.",
    )
    parser.add_argument(
        "-l",
        "--log_file",
        required=False,
        type=str,
        help="Logs are saved into this file.",
    )
    parser.add_argument(
        "-d",
        "--data_folder",
        required=True,
        type=str,
        help="Folder with the data files.",
    )
    args = parser.parse_args()

    study_table = args.study_table
    log_file = args.log_file
    data_folder = args.data_folder

    if not args.output:
        output_file = datetime.today().strftime("otar_crispr-%Y-%m-%d.json.gz")
    else:
        output_file = args.output

    # Configure logger:
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
    if log_file:
        logging.config.fileConfig(filename=log_file)
    else:
        logging.StreamHandler(sys.stderr)

    # Report input data:
    logging.info(f"Study information is read from: {study_table}")
    logging.info(f"Evidence saved to: {output_file}")
    logging.info(f"Data files read from: {data_folder}")

    # Process data:
    main(study_table, output_file, data_folder)
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
import argparse
import logging
import sys
from functools import reduce

from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.types as t
from pyspark.sql.dataframe import DataFrame

from common.evidence import (
    write_evidence_strings,
    initialize_sparksession,
    read_ppp_config,
)


# Datasource-wide constants:
VALIDATION_LAB_DATASOURCE_ID = "ot_crispr_validation"
VALIDATION_LAB_DATATYPE_ID = "ot_validation_lab"

# This is a map that provides recipie to generate the biomarker objects
# If a value cannot be found in the map, the value will be returned.
BIOMARKERMAPS = {
    "PAN": {
        "direct_mapping": {
            "CO": {"name": "PAN-CO", "description": "Pan-colorectal carcinoma"}
        }
    },
    "MS_status": {
        "direct_mapping": {
            "MSI": {"name": "MSI", "description": "Microsatellite instable"},
            "MSS": {"name": "MSS", "description": "Microsatellite stable"},
        }
    },
    "CRIS_subtype": {
        "direct_mapping": {
            "A": {
                "name": "CRIS-A",
                "description": "mucinous, glycolytic, enriched for microsatellite instability or KRAS mutations.",
            },
            "B": {
                "name": "CRIS-B",
                "description": "TGF-β pathway activity, epithelial-mesenchymal transition, poor prognosis.",
            },
            "C": {
                "name": "CRIS-C",
                "description": "elevated EGFR signalling, sensitivity to EGFR inhibitors.",
            },
            "D": {
                "name": "CRIS-D",
                "description": "WNT activation, IGF2 gene overexpression and amplification.",
            },
            "E": {
                "name": "CRIS-E",
                "description": "Paneth cell-like phenotype, TP53 mutations.",
            },
            "?": {"name": "CRIS-?", "description": "CRIS subtype undetermined."},
        }
    },
    "KRAS_status": {
        "description": "KRAS mutation status: ",
        "name": "KRAS-",
    },
    "TP53_status": {
        "description": "TP53 mutation status: ",
        "name": "TP53-",
    },
    "APC_status": {
        "description": "APC mutation status: ",
        "name": "APC-",
    },
    "BRAF_status": {"description": "BRAF mutation status: ", "name": "BRAF-"},
}


class ParseHypotheses:
    def __init__(self, spark) -> None:
        self.spark = spark

    def parse_hypotheses(self, expectedFile: str, observedFile: str) -> DataFrame:
        """
        Hypothesis is parsed from two files describing the expected and observed results.
        This function reads the files, compare them, parses the hypothesis as biomarker? + status

        Args:
            expectedFile: file with the expected results
            observedFile: file with the observed results
        Returns:
            DataFrame with the following schema:

            |-- gene: string (nullable = true)
            |-- hypotheses: array (nullable = false)
            |    |-- element: struct (containsNull = false)
            |    |    |-- name: string (nullable = true)
            |    |    |-- description: string (nullable = true)
            |    |    |-- status: string (nullable = false)
        """

        # The observed and expected results follows the same schema and parsed the same way:
        expected_df = self.read_hypothesis_data(expectedFile, "expected")
        observed_df = self.read_hypothesis_data(observedFile, "observed")

        return (
            expected_df
            # Joining expected vs observed hypothesis tables:
            .join(observed_df, on=["gene", "hypothesis"], how="inner")
            # Filter hypotheses where at least one was True:
            # .filter(col('expected') | col('observed'))
            # From the hypothesis column eg. CRIS_subtype-B ectract the type CRIS_subtype and the call: B
            .withColumn(
                "hypothesis_type", f.element_at(f.split(f.col("hypothesis"), "-"), 1)
            )
            .withColumn(
                "hypothesis_call", f.element_at(f.split(f.col("hypothesis"), "-"), 2)
            )
            # Using the biomarker parser generate an struct similar to the biomarker object:
            .withColumn(
                "hypothesis",
                get_biomarker(f.col("hypothesis_type"), f.col("hypothesis_call")),
            )
            # Besides the annotation we add the status of the hypothesis:
            .withColumn(
                "status",
                f.when(f.col("expected") & f.col("observed"), "observed and expected")
                .when(f.col("expected"), "expected but not observed")
                .when(f.col("observed"), "observed but not expected")
                .otherwise("not expected and not observed"),
            )
            .withColumn("hypothesis", f.struct("hypothesis.*", "status"))
            # Collect all hypotheses for each gene:
            .groupBy("gene")
            .agg(f.collect_set("hypothesis").alias("validationHypotheses"))
            .persist()
        )

    def read_hypothesis_data(self, file: str, call: str) -> DataFrame:
        """Parsing the hypothesis file.

        Args:
            file: hypothesis file tsv with genes in rows and biomarkers in columns, the hypotheses are boolean.
        Returns:
            DataFrame with the following columns: gene, hypothesis, call (true/false)
        """

        hypothesis_df = (
            self.spark.read.csv(file, sep="\t", header=True).withColumnRenamed(
                "Gene", "gene"
            )
            # The gene names are manually typed, there are lower and upper case names:
            .withColumn("gene", f.upper(f.col("gene")))
        )

        # The first column is the gene name, the rest are the hypotheses:
        hypothesis_columns = hypothesis_df.columns[1:]

        unpivot_expression = f"""stack({len(hypothesis_columns)}, {", ".join([f"'{x}', `{x}`" for x in hypothesis_columns])} ) as (hypothesis, {call})"""

        return (
            hypothesis_df.select("Gene", f.expr(unpivot_expression))
            .withColumn(call, f.col(call).cast(t.BooleanType()))
            .persist()
        )


@f.udf(
    t.StructType(
        [
            t.StructField("name", t.StringType(), False),
            t.StructField("description", t.StringType(), False),
        ]
    )
)
def get_biomarker(column_name, biomarker):
    """This function returns with a struct with the biomarker name and description"""

    # If the biomarker has a direct mapping:
    if "direct_mapping" in BIOMARKERMAPS[column_name]:
        try:
            return BIOMARKERMAPS[column_name]["direct_mapping"][biomarker]
        except KeyError:
            logging.warning(
                f"Could not find direct mapping for {column_name}:{biomarker}"
            )
            return None

    # If the value needs to be parsed:
    if biomarker == "wt":
        return {
            "name": BIOMARKERMAPS[column_name]["name"] + biomarker,
            "description": BIOMARKERMAPS[column_name]["description"] + "wild type",
        }
    elif biomarker == "mut":
        return {
            "name": BIOMARKERMAPS[column_name]["name"] + biomarker,
            "description": BIOMARKERMAPS[column_name]["description"] + "mutant",
        }
    else:
        logging.warning(f"Could not find direct mapping for {column_name}: {biomarker}")
        return None


def get_cell_passport_data(spark: SparkSession, cell_passport_file: str) -> DataFrame:

    # loading cell line annotation data from Sanger:
    return (
        spark.read.option("multiline", True)
        .csv(cell_passport_file, header=True, sep=",", quote='"')
        .select(
            f.regexp_replace(f.col("model_name"), r"-", "").alias("cellName"),
            f.col("model_id").alias("cellId"),
            f.col("tissue"),
        )
        .persist()
    )


def parse_experiment(
    spark: SparkSession, parameters: dict, cellPassportDf: DataFrame, data_folder: str
) -> DataFrame:
    """
    Parse experiment data from a file.

    Args:
        spark: Spark session.
        parameters: Dictionary of experimental parameters.
        cellPassportDf: Dataframe of cell passport data.
        data_folder: Location of the input data files.

    Returns:
        A dataframe of experiment data.
    """

    # Extracting parameters:
    experiment_file = f"{data_folder}/{parameters['experimentData']}"
    contrast = parameters["contrast"]
    studyOverview = parameters["studyOverview"]
    projectId = parameters["projectId"]
    projectDescription = parameters["projectDescription"]
    diseaseFromSource = parameters["diseaseFromSource"]
    diseaseFromSourceMapId = parameters["diseaseFromSourceMappedId"]
    confidenceCutoff = parameters["confidenceCutoff"]
    cell_line_file = f"{data_folder}/{parameters['cellLineFile']}"
    tissue_id = parameters["tissueId"]

    # The hypothesis is defined by two datasets:
    hypothesis_expected_file = f"{data_folder}/{parameters['hypothesisFileExpected']}"
    hypothesis_observed_file = f"{data_folder}/{parameters['hypothesisFileObserved']}"

    # Reading cell metadata from validation lab:
    validation_lab_cell_lines = (
        spark.read.csv(cell_line_file, sep="\t", header=True)
        # Renaming columns:
        .withColumnRenamed("cell_line", "cellName")
        .drop("tissue")
        # Joining dataset with cell model data read downloaded from Sanger website:
        .join(cellPassportDf, on="cellName", how="left")
        # Adding UBERON code to tissues (it's constant colon)
        .withColumn("tissueID", f.lit(tissue_id))
        # generating disease cell lines object:
        .withColumn(
            "diseaseCellLines",
            f.array(
                f.struct(
                    f.col("cellName").alias("name"),
                    f.col("cellId").alias("id"),
                    f.col("tissue"),
                    f.lit(tissue_id).alias("tissueId"),
                )
            ),
        )
        .persist()
    )

    logging.info(
        f"Validation lab cell lines has {validation_lab_cell_lines.count()} cell types."
    )

    # Defining how to process biomarkers:
    # 1. Looping through all possible biomarker - from biomarkerMaps.keys()
    # 2. The biomakers are then looked up in the map and process based on how the map defines.
    # 3. Description is also added read from the map.
    biomarkers_in_data = [
        biomarker
        for biomarker in BIOMARKERMAPS.keys()
        if biomarker in validation_lab_cell_lines.columns
    ]

    expressions = map(
        # Function to process biomarker:
        lambda biomarker: (
            biomarker,
            get_biomarker(f.lit(biomarker), f.col(biomarker)),
        ),
        # Iterator to apply the function over:
        biomarkers_in_data,
    )

    # Applying the full map on the dataframe one-by-one:
    biomarkers = reduce(
        lambda DF, value: DF.withColumn(*value), expressions, validation_lab_cell_lines
    )

    # The biomarker columns are unstacked into one single 'biomarkers' column:
    biomarker_unstack = f"""stack({len(biomarkers_in_data)}, {", ".join([f"'{x}', {x}" for x in biomarkers_in_data])}) as (biomarker_name, biomarkers)"""

    validation_lab_cell_lines = (
        biomarkers
        # Selecting cell line name, cell line annotation and applyting the stacking expression:
        .select(
            f.col("cellName"),
            "diseaseCellLines",
            f.expr(biomarker_unstack),
        )
        # Filter out all null biomarkers:
        .filter(
            (f.col("biomarkers").isNotNull())
            &
            # Following the request of the validation lab, we are removing CRIS biomarker annotation:
            (f.col("biomarker_name") != "CRIS_subtype")
        )
        # Grouping data by cell lines again:
        .groupBy("cellName")
        .agg(
            f.collect_list("biomarkers").alias("biomarkers"),
            f.first(f.col("diseaseCellLines")).alias("diseaseCellLines"),
        )
        .persist()
    )

    # Reading and processing hypothesis data:
    hypothesis_generator = ParseHypotheses(spark)
    validation_hypotheses_df = hypothesis_generator.parse_hypotheses(
        expectedFile=hypothesis_expected_file, observedFile=hypothesis_observed_file
    )

    # Reading experiment data from validation lab:
    evidence = (
        # Reading evidence:
        spark.read.csv(experiment_file, sep="\t", header=True)
        .withColumnRenamed("cell_line", "cellName")
        # Genes need to be uppercase:
        .withColumn("gene", f.upper(f.col("gene")))
        # Joining hypothesis data:
        .join(validation_hypotheses_df, on="gene", how="left")
        # Joining with cell line data:
        .join(validation_lab_cell_lines, on="cellName", how="left")
        # Selecting all columns:
        .select(
            f.col("gene").alias("targetFromSourceId"),
            f.col("validationHypotheses"),
            f.when(
                f.col("effect_size").cast("double") > 0,
                f.col("effect_size").cast("double"),
            )
            .otherwise(0)
            .alias("resourceScore"),
            f.when(
                f.col("effect_size").cast("double") >= confidenceCutoff,
                f.lit("significant"),
            )
            .otherwise(f.lit("not significant"))
            .alias("confidence"),
            f.when(f.col("expected_to_pass") == "TRUE", f.lit("significant"))
            .otherwise(f.lit("not significant"))
            .alias("expectedConfidence"),
            f.lit("upper tail").alias("statisticalTestTail"),
            f.lit(contrast).alias("contrast"),
            f.lit(studyOverview).alias("studyOverview"),
            f.lit(diseaseFromSourceMapId).alias("diseaseFromSourceMappedId"),
            f.lit(diseaseFromSource).alias("diseaseFromSource"),
            f.lit(projectId).alias("projectId"),
            f.lit(projectDescription).alias("projectDescription"),
            f.col("biomarkers").alias("biomarkerList"),
            f.col("diseaseCellLines"),
            f.lit(VALIDATION_LAB_DATATYPE_ID).alias("datatypeId"),
            f.lit(VALIDATION_LAB_DATASOURCE_ID).alias("datasourceId"),
        )
        .persist()
    )

    logging.info(f"Evidence count: {evidence.count()}.")
    return evidence


def main(
    config_file: str, output_file: str, cell_passport_file: str, data_folder: str
) -> None:

    # Initialize spark session
    spark = initialize_sparksession()

    # Parse experimental parameters:
    parameters = read_ppp_config(config_file)

    # Opening and parsing the cell passport data from Sanger:
    cell_passport_df = get_cell_passport_data(spark, cell_passport_file)

    logging.info(f"Cell passport dataframe has {cell_passport_df.count()} rows.")

    logging.info("Parsing experiment data...")

    # Create evidence for all experiments:
    evidence_dfs = []
    for experiment in parameters["experiments"]:
        evidence_dfs.append(
            parse_experiment(spark, experiment, cell_passport_df, data_folder)
        )

    # combine all evidence dataframes into one:
    combined_evidence_df = reduce(lambda df1, df2: df1.union(df2), evidence_dfs)

    # Save the combined evidence dataframe:
    write_evidence_strings(combined_evidence_df, output_file)


if __name__ == "__main__":

    # Reading output file name from the command line:
    parser = argparse.ArgumentParser(
        description="This script parse validation lab data and generates disease target evidence."
    )
    parser.add_argument(
        "--output_file", "-o", type=str, help="Output file. gzipped JSON", required=True
    )
    parser.add_argument(
        "--parameter_file",
        "-i",
        type=str,
        help="A JSON file describing exeriment metadata",
        required=True,
    )
    parser.add_argument(
        "--log_file",
        type=str,
        help="File into which the logs are saved",
        required=False,
    )
    parser.add_argument(
        "--cell_passport_file", type=str, help="Cell passport file", required=True
    )
    parser.add_argument(
        "--data_folder",
        type=str,
        help="Location of input files to process.",
        required=True,
    )
    args = parser.parse_args()

    # If no logfile is specified, logs are written to the standard error:
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
    if args.log_file:
        logging.config.fileConfig(filename=args.log_file)
    else:
        logging.StreamHandler(sys.stderr)

    # Passing all the required arguments:
    main(
        args.parameter_file, args.output_file, args.cell_passport_file, args.data_folder
    )
60
61
shell:
    "sed -n 's/^rule//p' {input}"
68
69
70
71
72
73
74
75
76
77
run:
    # The way this works is that remote filenames are actually represented by Snakemake as pseudo-local files.
    # Snakemake will catch the "os.rename" (the same way it would have caught a "mv" call) and proceed with
    # uploading the files.
    for local_filename, remote_filename in zip(input, output[:-1]):
        os.rename(local_filename, remote_filename)
    # Compress, timestamp, and upload the logs.
    with tarfile.open(output[-1], 'w:gz') as archive:
        for filename in os.listdir('log'):
            archive.add(os.path.join('log', filename))
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
shell:
    """
    # In this and the following rules, the exec call redirects the output of all subsequent commands (both STDOUT
    # and STDERR) to the specified log file.
    exec &> {log}
    python modules/cancerBiomarkers.py \
      --biomarkers_table {input.biomarkers_table} \
      --source_table {input.source_table} \
      --disease_table {input.disease_table} \
      --drug_index {input.drug_index} \
      --output_file {output}
    opentargets_validator --schema {params.schema} {output}
    """
120
121
122
123
124
125
126
127
128
shell:
    """
    exec &> {log}
    python modules/ChEMBL.py  \
        --chembl_evidence {input.evidenceFile} \
        --predictions {input.stopReasonCategories} \
        --output {output.evidenceFile}
    opentargets_validator --schema {params.schema} {output}
    """
SnakeMake From line 120 of master/Snakefile
140
141
142
143
144
145
146
147
148
149
150
151
152
153
shell:
    """
    exec &> {log}
    # Download directly because HTTP RemoteProvider does not handle retries correctly.
    wget -q -O clingen_summary.csv {params.summaryTableWeb}
    # Retain the original summary table and store that in GCS.
    cp clingen_summary.csv {output.summaryTable}
    python modules/ClinGen.py \
      --input_file clingen_summary.csv \
      --output_file {output.evidenceFile} \
      --cache_dir {params.cacheDir} \
      --local
    opentargets_validator --schema {params.schema} {output.evidenceFile}
    """
SnakeMake From line 140 of master/Snakefile
166
167
168
169
170
171
172
173
174
175
shell:
    """
    exec &> {log}
    python modules/CRISPR.py \
      --evidence_file {input.evidenceFile} \
      --descriptions_file {input.descriptionsFile} \
      --cell_types_file {input.cellTypesFile} \
      --output_file {output.evidenceFile}
    opentargets_validator --schema {params.schema} {output.evidenceFile}
    """
SnakeMake From line 166 of master/Snakefile
189
190
191
192
193
194
195
196
197
198
199
shell:
    """
    exec &> {log}
    python modules/GeneBurden.py \
        --az_binary_data {input.azPhewasBinary} \
        --az_quant_data {input.azPhewasQuant} \
        --curated_data {input.curation} \
        --genebass_data {input.genebass} \
        --output {output.evidenceFile}
    opentargets_validator --schema {params.schema} {output.evidenceFile}
    """
SnakeMake From line 189 of master/Snakefile
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
shell:
    """
    exec &> {log}
    # Retain the inputs. They will be later uploaded to GCS.
    cp {input.ddPanel} {output.ddBucket}
    cp {input.eyePanel} {output.eyeBucket}
    cp {input.skinPanel} {output.skinBucket}
    cp {input.cancerPanel} {output.cancerBucket}
    cp {input.cardiacPanel} {output.cardiacBucket}
    python modules/Gene2Phenotype.py \
      --dd_panel {input.ddPanel} \
      --eye_panel {input.eyePanel} \
      --skin_panel {input.skinPanel} \
      --cancer_panel {input.cancerPanel} \
      --cardiac_panel {input.cardiacPanel} \
      --output_file {output.evidenceFile} \
      --cache_dir {params.cacheDir} \
      --local
    opentargets_validator --schema {params.schema} {output.evidenceFile}
    """
SnakeMake From line 220 of master/Snakefile
252
253
254
255
256
257
258
259
260
261
shell:
    """
    exec &> {log}
    python modules/IntOGen.py \
      --inputGenes {input.inputGenes} \
      --inputCohorts {input.inputCohorts} \
      --diseaseMapping {input.diseaseMapping} \
      --outputFile {output.evidenceFile}
    opentargets_validator --schema {params.schema} {output.evidenceFile}
    """
SnakeMake From line 252 of master/Snakefile
273
274
275
276
277
278
279
280
281
shell:
    """
    exec &> {log}
    python modules/Orphanet.py \
      --input_file {input} \
      --output_file {output} \
      --cache_dir {params.cacheDir}
    opentargets_validator --schema {params.schema} {output}
    """
SnakeMake From line 273 of master/Snakefile
293
294
295
296
297
298
299
300
301
shell:
    """
    exec &> {log}
    python modules/PanelApp.py \
      --input-file {input.inputFile} \
      --output-file {output.evidenceFile} \
      --cache_dir {params.cacheDir}
    opentargets_validator --schema {params.schema} {output.evidenceFile}
    """
SnakeMake From line 293 of master/Snakefile
312
313
314
315
316
317
318
319
320
321
shell:
    """
    exec &> {log}
    python modules/IMPC.py \
      --cache-dir {params.cacheDir} \
      --output-evidence {output.evidenceFile} \
      --output-mouse-phenotypes {output.mousePhenotypes} \
      --score-cutoff 41
    opentargets_validator --schema {params.schema} {output.evidenceFile}
    """
SnakeMake From line 312 of master/Snakefile
334
335
336
337
338
339
340
341
342
343
shell:
    """
    exec &> {log}
    python modules/PROGENY.py \
      --inputFile {input.inputFile} \
      --diseaseMapping {input.diseaseMapping} \
      --pathwayMapping {input.pathwayMapping} \
      --outputFile {output.evidenceFile}
    opentargets_validator --schema {params.schema} {output.evidenceFile}
    """
SnakeMake From line 334 of master/Snakefile
355
356
357
358
359
360
361
362
363
shell:
    """
    exec &> {log}
    python modules/SLAPEnrich.py \
      --inputFile {input.inputFile} \
      --diseaseMapping {input.diseaseMapping} \
      --outputFile {output.evidenceFile}
    opentargets_validator --schema {params.schema} {output.evidenceFile}
    """
SnakeMake From line 355 of master/Snakefile
375
376
377
378
379
380
381
382
383
shell:
    """
    exec &> {log}
    python modules/SystemsBiology.py \
      --evidenceFile {input.evidenceFile} \
      --studyFile {input.studyFile} \
      --outputFile {output.evidenceFile}
    opentargets_validator --schema {params.schema} {output.evidenceFile}
    """
SnakeMake From line 375 of master/Snakefile
393
394
395
396
397
398
399
shell:
    """
    exec &> {log}
    python modules/TEP.py  \
      --output_file {output}
    opentargets_validator --schema {params.schema} {output}
    """
SnakeMake From line 393 of master/Snakefile
409
410
411
412
413
414
415
416
shell:
    """
    exec &> {log}
    python modules/crispr_screens.py  \
        --crispr_brain_mapping {params.crispr_brain_mapping} \
        --output {output}
    opentargets_validator --schema {params.schema} {output}
    """
SnakeMake From line 409 of master/Snakefile
431
432
433
434
435
436
437
438
439
440
441
shell:
    """
    exec &> {log}
    python modules/TargetSafety.py  \
        --adverse_events {params.ae} \
        --safety_risk {params.sr} \
        --toxcast {input.toxcast} \
        --aopwiki {input.aopwiki} \
        --output {output}
    opentargets_validator --schema {params.schema} {output}
    """
SnakeMake From line 431 of master/Snakefile
455
456
457
458
459
460
461
462
463
464
465
466
shell:
    """
    exec &> {log}
    # Retain the inputs. They will be later uploaded to GCS.
    cp {input.rawProbesExcel} {output.rawProbesExcel}
    cp {input.probesXrefsTable} {output.probesXrefsTable}
    python modules/chemicalProbes.py \
        --probes_excel_path {input.rawProbesExcel} \
        --probes_mappings_path {input.probesXrefsTable} \
        --output {output.evidenceFile}
    opentargets_validator --schema {params.schema} {output.evidenceFile}
    """
SnakeMake From line 455 of master/Snakefile
477
478
479
480
481
482
483
484
485
shell:
    """
    exec &> {log}
    python partner_preview_scripts/ot_crispr.py \
        --study_table {params.study_table} \
        --data_folder {params.data_folder} \
        --output {output}
    opentargets_validator --schema {params.schema} {output}
    """
SnakeMake From line 477 of master/Snakefile
498
499
500
501
502
503
504
505
506
507
shell:
    """
    exec &> {log}
    python partner_preview_scripts/encore_parser.py \
        --output_file {output} \
        --parameter_file {params.config} \
        --data_folder {params.data_folder} \
        --cell_passport_file {input.cell_passport_table}
    opentargets_validator --schema {params.schema} {output}
    """
SnakeMake From line 498 of master/Snakefile
520
521
522
523
524
525
526
527
528
529
shell:
    """
    exec &> {log}
    python partner_preview_scripts/ValidationLab.py \
        --parameter_file {params.config} \
        --data_folder {params.data_folder} \
        --cell_passport_file {input.cell_passport_table} \
        --output_file {output}
    opentargets_validator --schema {params.schema} {output}
    """
SnakeMake From line 520 of master/Snakefile
536
537
538
run:
    for source in PPP_sources:
        os.rename(source[0], source[1])
SnakeMake From line 536 of master/Snakefile
ShowHide 39 more snippets with no or duplicated tags.

Login to post a comment if you would like to share your experience with this workflow.

Do you know this workflow well? If so, you can request seller status , and start supporting this workflow.

Free

Created: 1yr ago
Updated: 1yr ago
Maitainers: public
URL: https://github.com/opentargets/evidence_datasource_parsers
Name: evidence_datasource_parsers
Version: 2.1.1
Badge:
workflow icon

Insert copied code into your website to add a link to this workflow.

Downloaded: 0
Copyright: Public Domain
License: Apache License 2.0
  • Future updates

Related Workflows

cellranger-snakemake-gke
snakemake workflow to run cellranger on a given bucket using gke.
A Snakemake workflow for running cellranger on a given bucket using Google Kubernetes Engine. The usage of this workflow ...