Neo4j Data Integration and Build Pipeline for EpiGraphDB Graph Creation

public public 1yr ago 0 bookmarks

Neo4j data integration and build pipeline - https://github.com/elswob/neo4j-build-pipeline

This pipeline originated from the work done to create the graph for EpiGraphDB . With over 20 separate data sets,

Code Snippets

39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
run:
    #open output file
    o = open(output[0], "w")
    #validata data integration config file
    if NODEDIR in config:
        nodes = config[NODEDIR]
        for i in nodes:
            o.write(f"integration node {i}\n")
            validate(nodes[i], os.path.join(os.getcwd(),DATA_CONFIG_SCHEMA))
    if RELDIR in config:
        rels = config[RELDIR]
        for i in rels:
            o.write(f"integration rel {i}\n")
            validate(rels[i], os.path.join(os.getcwd(),DATA_CONFIG_SCHEMA))

    #validate db schema config file
    with open(os.path.join(CONFIG_PATH,"db_schema.yaml")) as file:
        db_schema = yaml.load(file,Loader=yaml.FullLoader)
        if 'meta_nodes' in db_schema:
            nodes = db_schema['meta_nodes']
            for i in nodes:
                o.write(f"schema node {i}\n")
                validate(nodes[i], os.path.join(os.getcwd(),DB_SCHEMA_NODES_SCHEMA))
        else:
            print('The db schame has no nodes!')
            exit()
        if 'meta_rels' in db_schema:
            rels = db_schema['meta_rels']
            for i in rels:
                o.write(f"schema rel {i}\n")
                validate(rels[i], os.path.join(os.getcwd(),DB_SCHEMA_RELS_SCHEMA))
    o.close()
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
shell:
    """
    echo 'Deleting {params.NEO4J_IMPORTDIR}/{params.NODEDIR}/merged/*'
    rm -f {params.NEO4J_IMPORTDIR}/{params.NODEDIR}/merged/*
    echo 'Deleting {params.NEO4J_IMPORTDIR}/{params.NODEDIR}/created.txt'
    rm -f {params.NEO4J_IMPORTDIR}/{params.NODEDIR}/created.txt
    echo 'Deleting {params.NEO4J_IMPORTDIR}/{params.RELDIR}/created.txt'
    rm -f {params.NEO4J_IMPORTDIR}/{params.RELDIR}/created.txt
    echo 'Deleting find {params.SNAKEMAKELOGS}/{params.NODEDIR} -name "*.log" -delete'
    if [ -f {params.SNAKEMAKELOGS}/{params.NODEDIR} ]; then find {params.SNAKEMAKELOGS}/{params.NODEDIR} -name "*.log" -delete; fi
    echo 'Deleting find {params.SNAKEMAKELOGS}/{params.RELDIR} -name "*.log" -delete'
    if [ -f {params.SNAKEMAKELOGS}/{params.RELDIR} ]; then find {params.SNAKEMAKELOGS}/{params.RELDIR} -name "*.log" -delete; fi
    echo 'Deleting {params.NEO4J_IMPORTDIR}/master*'
    rm -f {params.NEO4J_IMPORTDIR}/master*
    echo 'Deleting {params.SNAKEMAKELOGS}/*.log'
    rm -f {params.SNAKEMAKELOGS}/*.log
    echo 'Deleting {params.NEO4J_IMPORTDIR}/logs/*'
    rm -f {params.NEO4J_IMPORTDIR}/logs/*
    #not sure if below is too severe
    echo 'Deleting find {params.NEO4J_IMPORTDIR}/{params.NODEDIR} -name "*.csv.gz" -delete -o -name "*import-nodes.txt" -delete'
    if [ -d {params.NEO4J_IMPORTDIR}/{params.NODEDIR} ]; 
        then find {params.NEO4J_IMPORTDIR}/{params.NODEDIR} -name "*.csv.gz" -delete -o -name "*import-nodes.txt" -delete; 
    else
        echo "{params.NEO4J_IMPORTDIR}/{params.NODEDIR} is missing"
    fi
    echo 'Deleting find {params.NEO4J_IMPORTDIR}/{params.RELDIR} -name "*.csv.gz" -delete -o -name "*import-nodes.txt" -delete'
    if [ -d {params.NEO4J_IMPORTDIR}/{params.RELDIR} ]; 
        then find {params.NEO4J_IMPORTDIR}/{params.RELDIR} -name "*.csv.gz" -delete -o -name "*import-nodes.txt" -delete; 
    else
        echo "{params.NEO4J_IMPORTDIR}/{params.RELDIR} is missing"
    fi
    """
126
127
128
129
130
131
132
133
134
shell:
    """
    rm -f {params.NEO4J_IMPORTDIR}/{params.NODEDIR}/merged/*
    rm -f {params.NEO4J_IMPORTDIR}/{params.NODEDIR}/created.txt
    rm -f {params.NEO4J_IMPORTDIR}/{params.RELDIR}/created.txt
    rm -f {params.NEO4J_IMPORTDIR}/master*
    rm -f {params.SNAKEMAKELOGS}/master*
    rm -f {params.SNAKEMAKELOGS}/import_report.log
    """       
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
shell:
    """
    echo 'Starting database...'
    #force load of .env file if it exists to avoid docker issues with cached variables
    if [ -f .env ]; then export $(cat .env | sed 's/#.*//g' | xargs); fi
    #create neo4j directories if not already done
    echo 'Creating Neo4j graph directories'
    python -m workflow.scripts.graph_build.create_neo4j > {log.graph} 2> {log.graph}
    #create container
    docker-compose up -d 
    #docker-compose up -d --no-recreate 
    echo 'removing old database...'
    docker exec --user neo4j {CONTAINER_NAME} sh -c 'rm -rf /var/lib/neo4j/data/databases/neo4j' > {log.graph} 2> {log.graph}
    docker exec --user neo4j {CONTAINER_NAME} sh -c 'rm -f /var/lib/neo4j/data/transactions/neo4j/*' > {log.graph} 2> {log.graph}
    echo 'running import...'
    SECONDS=0
    docker exec --user neo4j {CONTAINER_NAME} sh /var/lib/neo4j/import/master_import.sh > {log.build} 2> {log.build}
    duration=$SECONDS
    echo "Import took $(($duration / 60)) minutes and $(($duration % 60)) seconds."
    echo 'stopping container {CONTAINER_NAME}...'
    docker stop {CONTAINER_NAME}
    echo 'starting container {CONTAINER_NAME}...'
    docker start {CONTAINER_NAME}
    echo 'waiting a bit...'
    sleep 30
    echo 'adding contraints and extra bits...'
    docker exec --user neo4j {CONTAINER_NAME} sh /var/lib/neo4j/import/master_constraints.sh > {log.constraints} 2> {log.constraints}
    echo 'waiting a bit for indexes to populate...'
    sleep 30
    echo 'checking import report...'
    python -m workflow.scripts.graph_build.import-report-check {NEO4J_LOGDIR}/import.report > {output}
    echo 'running tests...'
    python -m pytest -vv
    echo 'Neo4j browser available here: http://{NEO4J_ADDRESS}:{NEO4J_HTTP}/browser'
    #open http://{NEO4J_ADDRESS}:{NEO4J_HTTP}/browser
    """
189
190
191
192
193
194
shell: 
    """
    #rm -f {NEO4J_IMPORTDIR}/{NODEDIR}/merged/*
    python -m workflow.scripts.graph_build.merge_sources > {log} 2> {log}
    python -m workflow.scripts.graph_build.create_master_import > {log} 2> {log}
    """
203
shell: "echo `date` > {NEO4J_IMPORTDIR}/{NODEDIR}/created.txt"
215
216
217
218
219
220
221
222
223
224
225
226
227
shell: 
    """
    #make neo4j directory
    d={NEO4J_IMPORTDIR}/{NODEDIR}/{params.meta_id}
    mkdir -p $d

    #clean up any old import and constraint data
    rm -f $d/{params.meta_id}-import-nodes.txt
    rm -f $d/{params.meta_id}-constraint.txt

    #run the processing script
    python -m {params.PROCESSINGDIR}.{params.metaData[script]} -n {params.meta_id} > {log} 2> {log}
    """
236
shell: "echo `date` > {NEO4J_IMPORTDIR}/{RELDIR}/created.txt"
248
249
250
251
252
253
254
255
256
257
258
259
260
shell: 
    """
    #make directory
    d={NEO4J_IMPORTDIR}/{RELDIR}/{params.meta_id}
    mkdir -p $d

    #clean up any old import and constraint data
    rm -f $d/{params.meta_id}-import-rels.txt
    rm -f $d/{params.meta_id}-constraint.txt

    #run the processing script
    python -m {params.PROCESSINGDIR}.{params.metaData[script]} -n {params.meta_id} > /dev/null 2> {log}
    """
264
265
266
267
shell: 
    """
    python -m workflow.scripts.graph_build.create_neo4j_backup > {log} 2> {log}
    """
ShowHide 10 more snippets with no or duplicated tags.

Login to post a comment if you would like to share your experience with this workflow.

Do you know this workflow well? If so, you can request seller status , and start supporting this workflow.

Free

Created: 1yr ago
Updated: 1yr ago
Maitainers: public
URL: https://github.com/Lekanville/epg
Name: epg
Version: 1
Badge:
workflow icon

Insert copied code into your website to add a link to this workflow.

Downloaded: 0
Copyright: Public Domain
License: MIT License
  • Future updates

Related Workflows

cellranger-snakemake-gke
snakemake workflow to run cellranger on a given bucket using gke.
A Snakemake workflow for running cellranger on a given bucket using Google Kubernetes Engine. The usage of this workflow ...