Neo4j Data Integration and Build Pipeline for EpiGraphDB: Graph Creation from Multiple Data Sources

public public 1yr ago 0 bookmarks

Neo4j data integration and build pipeline - https://github.com/elswob/neo4j-build-pipeline

This pipeline originated from the work done to create the graph for EpiGraphDB . With over 20 separate data sets, >10 node types and >40 relationship types we needed to create a pipeline that could make the process relatively easy for others in the group to contribute. By combining Snakemake , Docker , Neo4j and GitHub Actions we have created a pipeline that can create a fully tested Neo4j graph database from raw data.

One of the main aims of this pipeline was performance. Initial efforts used the LOAD CSV method, but quickly became slow as the size and complexity of the graph increased. Here we focus on creating clean data sets that can be loaded using the neo4j-import tool, bringing build time down from hours to minutes.

Components of interest:

  • Pipeline can be used to prepare raw data, create files for graph, or build graph.

  • A defined schema is used to QC all data before loading.

  • Merging multiple data sets into a single node type is handled automatically.

  • Use neo4j-import to build the graph

Note:

  • This is not a fully tested pipeline, there are known issues with Docker and Neo4j that need careful consideration.

Prerequisites

Conda (required)

Install miniconda3

  • https://docs.conda.io/en/latest/miniconda.html
wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh
sh Miniconda3-latest-Linux-x86_64.sh

Docker and Docker Compose (only required if building a graph)

Docker (17.06.0-ce) - https://docs.docker.com/get-docker/ Docker Compose (v1.27.4) - https://docs.docker.com/compose/install/

shuf (or gshuf)

For linux distributions this should be ok, but for mac, may need to install coreutils

brew install coreutils

Basic setup

The following will run the demo data and create a basic graph

#clone the repo (use https if necessary)
git clone [email protected]:elswob/neo4j-build-pipeline.git
cd neo4j-build-pipeline
#create the conda environment
conda env create -f environment.yml
conda activate neo4j_build
#create a basic environment variable file for demo data - this probably requires some edits, but may work as is
cp example.env .env 
#run the pipeline
snakemake -r all -j 4

Full setup

Clone the repo

If just testing, simply clone the repo git clone [email protected]:elswob/neo4j-build-pipeline.git and skip straight to Create conda environment

If creating a new pipeline and graph based on this repo there are two options:

  1. Fork the repo and skip straight to Create conda environment

  2. Create a copy of the repo, see below:

Create a new GitGHub repo
  • follows method from here - https://github.com/manubot/rootstock/blob/master/SETUP.md#configuration

Create an empty GitHub repository at https://github.com/new .

Make a note of user and repo name

OWNER=xxx
REPO=abc
Clone the repo and reconfigure
git clone git@github.com:elswob/neo4j-build-pipeline.git
cd neo4j-build-pipeline

Set the origin URL to match repo created above

git remote set-url origin https://github.com/$OWNER/$REPO.git
git remote set-url origin git@github.com:$OWNER/$REPO.git

Push to new repo

git push --set-upstream origin main

Create conda environment

conda env create -f environment.yml
conda activate neo4j_build

Run build tests

snakemake -r clean_all -j 1
snakemake -r check_new_data -j 4

Adding new data

ADDING_DATA

Build graph

A complete run of the pipeline will create a Neo4j graph within a Docker container, on the machine running the pipeline. The variables that are used for that are defined in the .env file.

1. Create .env file

Copy example.env to .env and edit

cp example.env .env
  • Modify this

  • No spaces in paths please :)

  • Use absolute/relative paths where stated

  • If using remote server for raw data and backups, set SERVER_NAME and set up SSH keys Remote Server

### Data integration variables
#version of graph being built
GRAPH_VERSION=0.0.1
#location of snakemake logs (relative or absolute)
SNAKEMAKE_LOGS=demo/results/logs
#neo4j directories (absolute)
NEO4J_IMPORT_DIR=./demo/neo4j/0.0.1/import
NEO4J_DATA_DIR=./demo/neo4j/0.0.1/data
NEO4J_LOG_DIR=./demo/neo4j/0.0.1/logs
#path to directory containing source data (absolute)
DATA_DIR=demo/source_data
#path to directory containing data processing script directories and code (relative)
PROCESSING_DIR=demo/scripts/processing
#path to directory for graph data backups (relative or absolute)
GRAPH_DIR=demo/results/graph_data
#path to config (relative or absolute)
CONFIG_PATH=demo/config
#name of server if source data is on a remote machine, not needed if all data are local
#SERVER_NAME=None
#number of threads to use for parallel parts
THREADS=10
############################################################################################################
#### Docker things for building graph, ignore if not using
# GRAPH_CONTAINER_NAME:
# Used in docker-compose and snakefile to
# assign container name to the db service to use docker exec
GRAPH_CONTAINER_NAME=neo4j-pipeline-demo-graph
#Neo4j server address (this will be the server running the pipeline and be used to populate the Neo4j web server conf)
NEO4J_ADDRESS=neo4j.server.com
# Neo4j connection
GRAPH_USER=neo4j
GRAPH_HOST=localhost
GRAPH_PASSWORD=changeme
GRAPH_HTTP_PORT=27474
GRAPH_BOLT_PORT=27687
GRAPH_HTTPS_PORT=27473
# Neo4j memory
# Set these to something suitable, for testing the small example data 1G should be fine. For anything bigger, see https://neo4j.com/developer/kb/how-to-estimate-initial-memory-configuration/
GRAPH_HEAP_INITIAL=1G
GRAPH_PAGECACHE=1G
GRAPH_HEAP_MAX=2G

2. Build the graph

snakemake -r all -j 4

You should then be able to explore the graph via Neo4j browser by visiting the URL of the server hosting the graph plus the GRAPH_HTTP_PORT number specified, e.g. localhost:27474 . Here you can login with the following

  • Connect URL = bolt:// name_of_server : GRAPH_BOLT_PORT from .env

  • Authentication type = Username/Password

  • Username = GRAPH_USER from .env

  • Password = GRAPH_PASSWORD from .env

Potential problems

Adding Neo4j array properties

Because neo4j-admin is expecting arrays in a particlar format, all arrays need to be separated by ; and have no surrounding [] or quotes. To help with this, there is a function create_neo4j_array_from_array in workflow.scripts.utils.general .

docker-compose version

Old version of docker-compose, just pip install a new one :)

pip install --user docker-compose

Neo4j directories need to be created before creating the Neo4j container

Due to issues with Neo4j 4.* and Docker, need to manually create Neo4j directories before building the graph. If this is not done, Docker will create the Neo4j directories and make them unreadable.

  • this happens during Snakemake create_graph rule via workflow.scripts.graph_build.create_neo4j .

  • to run this manually

python -m workflow.scripts.graph_build.create_neo4j

Port clashes

If this error:

Bind for 0.0.0.0:xxx failed: port is already allocated

Then need to change ports in .env as they are already being used by another container

GRAPH_HTTP_PORT=17474
GRAPH_BOLT_PORT=17687
GRAPH_HTTPS_PORT=17473

Docker group

When building the graph, if the user is not part of the docker group may see an error like this

Starting database...
Creating Neo4j graph directories
.....
PermissionError: [Errno 13] Permission denied

To fix this, need to be added to docker group

https://docs.docker.com/engine/install/linux-postinstall/

sudo usermod -aG docker $USER

Access denied

If connections result in the following:

The client is unauthorized due to authentication failure.

There may be an issue with authentication.

First

  • check password used, make sure it doesn't contain any special characters such as # . If so, change the password, reload .env file, then rebuild:
export $(cat .env | sed 's/#.*//g' | xargs);
snakemake -r clean_all -j1
snakemake -r all -j1

Second

  • it is possible to reset a password

  • https://neo4j.com/docs/operations-manual/4.0/configuration/password-and-user-recovery/

  1. Get variables from .env file
export $(cat .env | sed 's/#.*//g' | xargs)
  1. Disable auth in docker-compose.yml
- NEO4J_dbms_security_auth__enabled=false

Restart container

docker-compose down
docker-compose up -d
  1. Reset the password
docker exec -it $GRAPH_CONTAINER_NAME cypher-shell -a localhost:$GRAPH_BOLT_PORT -d system
ALTER USER neo4j SET PASSWORD 'changeme';
  1. Enable auth in docker-compose.yml
- NEO4J_dbms_security_auth__enabled=true

Restart container

docker-compose down
docker-compose up -d
  1. Check connection

If this has worked, you will be asked for username and password, and connection will succeed.

docker exec -it $GRAPH_CONTAINER_NAME cypher-shell -a localhost:$GRAPH_BOLT_PORT -d system

Saving and restoring database

Creating a backup

  • https://neo4j.com/docs/operations-manual/current/docker/maintenance/#docker-neo4j-backup
snakemake -r backup_graph -j1

Restoring a backup

On production server, create data directory

mkdir data
chmod 777 data

Move dump into data

mv neo4j ./data

Start container

docker-compose -f docker-compose-public.yml up -d

Stop neo4j but keep container open

public_container=db-public
docker exec -it $public_container cypher-shell -a neo4j://localhost:1234 -d system "stop database neo4j;"

Restore the backup

docker exec -it $public_container bin/neo4j-admin restore --from data/neo4j --verbose --force

Restart the database

docker exec -it $public_container cypher-shell -a neo4j://localhost:1234 -d system "start database neo4j;"

Merging upstream changes

Again, based on logic from here - https://github.com/manubot/rootstock/blob/master/SETUP.md#merging-upstream-rootstock-changes

#checkout new branch
git checkout -b nbp-$(date '+%Y-%m-%d')

Pull new commits from neo4j-build-pipeline

#if remote not set
git config remote.neo4j-build-pipeline.url || git remote add neo4j-build-pipeline https://github.com/elswob/neo4j-build-pipeline.git
#pull new commits
git pull --no-ff --no-rebase --no-commit neo4j-build-pipeline main

If no problems, commit new updates

git commit -am 'merging upstream changes'
git push origin nbp-$(date '+%Y-%m-%d')

Then open a pull request

Visualise

https://snakemake.readthedocs.io/en/v5.1.4/executable.html#visualization

snakemake -r all --rulegraph | dot -Tpdf > rulegraph.pdf

alt text

snakemake -r all --dag | dot -Tpdf > dag.pdf

alt text

Report

https://snakemake.readthedocs.io/en/stable/snakefiles/reporting.html

Run this after the workflow has finished
snakemake --report report.html

Code Snippets

39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
run:
    #open output file
    o = open(output[0], "w")
    #validata data integration config file
    if NODEDIR in config:
        nodes = config[NODEDIR]
        for i in nodes:
            o.write(f"integration node {i}\n")
            validate(nodes[i], os.path.join(os.getcwd(),DATA_CONFIG_SCHEMA))
    if RELDIR in config:
        rels = config[RELDIR]
        for i in rels:
            o.write(f"integration rel {i}\n")
            validate(rels[i], os.path.join(os.getcwd(),DATA_CONFIG_SCHEMA))

    #validate db schema config file
    with open(os.path.join(CONFIG_PATH,"db_schema.yaml")) as file:
        db_schema = yaml.load(file,Loader=yaml.FullLoader)
        if 'meta_nodes' in db_schema:
            nodes = db_schema['meta_nodes']
            for i in nodes:
                o.write(f"schema node {i}\n")
                validate(nodes[i], os.path.join(os.getcwd(),DB_SCHEMA_NODES_SCHEMA))
        else:
            print('The db schame has no nodes!')
            exit()
        if 'meta_rels' in db_schema:
            rels = db_schema['meta_rels']
            for i in rels:
                o.write(f"schema rel {i}\n")
                validate(rels[i], os.path.join(os.getcwd(),DB_SCHEMA_RELS_SCHEMA))
    o.close()
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
shell:
    """
    echo 'Deleting {params.NEO4J_IMPORTDIR}/{params.NODEDIR}/merged/*'
    rm -f {params.NEO4J_IMPORTDIR}/{params.NODEDIR}/merged/*
    echo 'Deleting {params.NEO4J_IMPORTDIR}/{params.NODEDIR}/created.txt'
    rm -f {params.NEO4J_IMPORTDIR}/{params.NODEDIR}/created.txt
    echo 'Deleting {params.NEO4J_IMPORTDIR}/{params.RELDIR}/created.txt'
    rm -f {params.NEO4J_IMPORTDIR}/{params.RELDIR}/created.txt
    echo 'Deleting find {params.SNAKEMAKELOGS}/{params.NODEDIR} -name "*.log" -delete'
    if [ -f {params.SNAKEMAKELOGS}/{params.NODEDIR} ]; then find {params.SNAKEMAKELOGS}/{params.NODEDIR} -name "*.log" -delete; fi
    echo 'Deleting find {params.SNAKEMAKELOGS}/{params.RELDIR} -name "*.log" -delete'
    if [ -f {params.SNAKEMAKELOGS}/{params.RELDIR} ]; then find {params.SNAKEMAKELOGS}/{params.RELDIR} -name "*.log" -delete; fi
    echo 'Deleting {params.NEO4J_IMPORTDIR}/master*'
    rm -f {params.NEO4J_IMPORTDIR}/master*
    echo 'Deleting {params.SNAKEMAKELOGS}/*.log'
    rm -f {params.SNAKEMAKELOGS}/*.log
    echo 'Deleting {params.NEO4J_IMPORTDIR}/logs/*'
    rm -f {params.NEO4J_IMPORTDIR}/logs/*
    #not sure if below is too severe
    echo 'Deleting find {params.NEO4J_IMPORTDIR}/{params.NODEDIR} -name "*.csv.gz" -delete -o -name "*import-nodes.txt" -delete'
    if [ -d {params.NEO4J_IMPORTDIR}/{params.NODEDIR} ]; 
        then find {params.NEO4J_IMPORTDIR}/{params.NODEDIR} -name "*.csv.gz" -delete -o -name "*import-nodes.txt" -delete; 
    else
        echo "{params.NEO4J_IMPORTDIR}/{params.NODEDIR} is missing"
    fi
    echo 'Deleting find {params.NEO4J_IMPORTDIR}/{params.RELDIR} -name "*.csv.gz" -delete -o -name "*import-nodes.txt" -delete'
    if [ -d {params.NEO4J_IMPORTDIR}/{params.RELDIR} ]; 
        then find {params.NEO4J_IMPORTDIR}/{params.RELDIR} -name "*.csv.gz" -delete -o -name "*import-nodes.txt" -delete; 
    else
        echo "{params.NEO4J_IMPORTDIR}/{params.RELDIR} is missing"
    fi
    """
126
127
128
129
130
131
132
133
134
shell:
    """
    rm -f {params.NEO4J_IMPORTDIR}/{params.NODEDIR}/merged/*
    rm -f {params.NEO4J_IMPORTDIR}/{params.NODEDIR}/created.txt
    rm -f {params.NEO4J_IMPORTDIR}/{params.RELDIR}/created.txt
    rm -f {params.NEO4J_IMPORTDIR}/master*
    rm -f {params.SNAKEMAKELOGS}/master*
    rm -f {params.SNAKEMAKELOGS}/import_report.log
    """       
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
shell:
    """
    echo 'Starting database...'
    #force load of .env file if it exists to avoid docker issues with cached variables
    if [ -f .env ]; then export $(cat .env | sed 's/#.*//g' | xargs); fi
    #create neo4j directories if not already done
    echo 'Creating Neo4j graph directories'
    python -m workflow.scripts.graph_build.create_neo4j > {log.graph} 2> {log.graph}
    #create container
    docker-compose up -d 
    #docker-compose up -d --no-recreate 
    echo 'removing old database...'
    docker exec --user neo4j {CONTAINER_NAME} sh -c 'rm -rf /var/lib/neo4j/data/databases/neo4j' > {log.graph} 2> {log.graph}
    docker exec --user neo4j {CONTAINER_NAME} sh -c 'rm -f /var/lib/neo4j/data/transactions/neo4j/*' > {log.graph} 2> {log.graph}
    echo 'running import...'
    SECONDS=0
    docker exec --user neo4j {CONTAINER_NAME} sh /var/lib/neo4j/import/master_import.sh > {log.build} 2> {log.build}
    duration=$SECONDS
    echo "Import took $(($duration / 60)) minutes and $(($duration % 60)) seconds."
    echo 'stopping container {CONTAINER_NAME}...'
    docker stop {CONTAINER_NAME}
    echo 'starting container {CONTAINER_NAME}...'
    docker start {CONTAINER_NAME}
    echo 'waiting a bit...'
    sleep 30
    echo 'adding contraints and extra bits...'
    docker exec --user neo4j {CONTAINER_NAME} sh /var/lib/neo4j/import/master_constraints.sh > {log.constraints} 2> {log.constraints}
    echo 'waiting a bit for indexes to populate...'
    sleep 30
    echo 'checking import report...'
    python -m workflow.scripts.graph_build.import-report-check {NEO4J_LOGDIR}/import.report > {output}
    echo 'running tests...'
    python -m pytest -vv
    echo 'Neo4j browser available here: http://{NEO4J_ADDRESS}:{NEO4J_HTTP}/browser'
    #open http://{NEO4J_ADDRESS}:{NEO4J_HTTP}/browser
    """
189
190
191
192
193
194
shell: 
    """
    #rm -f {NEO4J_IMPORTDIR}/{NODEDIR}/merged/*
    python -m workflow.scripts.graph_build.merge_sources > {log} 2> {log}
    python -m workflow.scripts.graph_build.create_master_import > {log} 2> {log}
    """
203
shell: "echo `date` > {NEO4J_IMPORTDIR}/{NODEDIR}/created.txt"
215
216
217
218
219
220
221
222
223
224
225
226
227
shell: 
    """
    #make neo4j directory
    d={NEO4J_IMPORTDIR}/{NODEDIR}/{params.meta_id}
    mkdir -p $d

    #clean up any old import and constraint data
    rm -f $d/{params.meta_id}-import-nodes.txt
    rm -f $d/{params.meta_id}-constraint.txt

    #run the processing script
    python -m {params.PROCESSINGDIR}.{params.metaData[script]} -n {params.meta_id} > {log} 2> {log}
    """
236
shell: "echo `date` > {NEO4J_IMPORTDIR}/{RELDIR}/created.txt"
248
249
250
251
252
253
254
255
256
257
258
259
260
shell: 
    """
    #make directory
    d={NEO4J_IMPORTDIR}/{RELDIR}/{params.meta_id}
    mkdir -p $d

    #clean up any old import and constraint data
    rm -f $d/{params.meta_id}-import-rels.txt
    rm -f $d/{params.meta_id}-constraint.txt

    #run the processing script
    python -m {params.PROCESSINGDIR}.{params.metaData[script]} -n {params.meta_id} > /dev/null 2> {log}
    """
264
265
266
267
shell: 
    """
    python -m workflow.scripts.graph_build.create_neo4j_backup > {log} 2> {log}
    """
ShowHide 10 more snippets with no or duplicated tags.

Login to post a comment if you would like to share your experience with this workflow.

Do you know this workflow well? If so, you can request seller status , and start supporting this workflow.

Free

Created: 1yr ago
Updated: 1yr ago
Maitainers: public
URL: https://github.com/Lekanville/franklin_epg
Name: franklin_epg
Version: 1
Badge:
workflow icon

Insert copied code into your website to add a link to this workflow.

Downloaded: 0
Copyright: Public Domain
License: MIT License
  • Future updates

Related Workflows

cellranger-snakemake-gke
snakemake workflow to run cellranger on a given bucket using gke.
A Snakemake workflow for running cellranger on a given bucket using Google Kubernetes Engine. The usage of this workflow ...