Personal data redaction on images based on FHIR patient resources.
Help improve this workflow!
This workflow has been published but could be further improved with some additional meta data:- Keyword(s) in categories input, output, operation, topic
You can help improve this workflow by suggesting the addition or removal of keywords, suggest changes and report issues, or request to become a maintainer of the Workflow .
This workflow redacted personal information on given images. The personal information must be provided as FHIR patient resource .
Authors
-
Thomas Battenfeld (@thomasbtf)
-
Simon Magin (@simakro)
-
Josefa Welling (@josefawelling)
-
Christin Seifert
-
Folker Meyer (@folker)
Usage
Step 1: Obtain a copy of this workflow
If you simply want to use this workflow, download and extract the latest release . If you intend to modify and further extend this workflow or want to work under version control, fork this repository as outlined in Advanced . The latter way is recommended.
In any case, if you use this workflow in a paper, don't forget to give credits to the authors by citing the URL of this repository and, if available, its DOI (see above).
Step 2: Configure workflow
Configure the workflow according to your needs by editing the files in the
config/
folder. Adjust the
config/config.yaml
to configure the workflow execution, and the
config/pep/documents.csv
to specify your documents and meta data.
Step 3: Install Snakemake
Install Snakemake using conda :
conda create -c bioconda -c conda-forge -n snakemake snakemake
For installation details, see the instructions in the Snakemake documentation .
Step 4: Execute workflow
Activate the conda environment:
conda activate snakemake
Test your configuration by performing a dry-run via
snakemake --use-conda -n
Then execute the workflow with
$N
cores via
snakemake --use-conda --cores $N
If you not only want to fix the software stack but also the underlying OS, use
snakemake --use-conda --use-singularity
in combination with the modes above. See the Snakemake documentation for further details.
Step 5: Investigate results
After successful execution, you can create a self-contained interactive HTML report with all results via:
snakemake --report report.zip
This report can, e.g., be forwarded to your collaborators. An example (using some trivial test data) can be seen here .
Advanced
The following recipe provides established best practices for running and extending this workflow in a reproducible way.
-
Fork the repo to a personal or lab account.
-
Clone the fork to the desired working directory for the concrete project/run on your machine.
-
Create a new branch (the project-branch) within the clone and switch to it. The branch will contain any project-specific modifications (e.g. to configuration, but also to code).
-
Modify the config, and any necessary sheets (and probably the workflow) as needed.
-
Commit any changes and push the project-branch to your fork on github.
-
Run the analysis.
-
Optional: Merge back any valuable and generalizable changes to the upstream repo via a pull request . This would be greatly appreciated .
-
Optional: Push results (plots/tables) to the remote branch on your fork.
-
Optional: Create a self-contained workflow archive for publication along with the paper (snakemake --archive).
-
Optional: Delete the local clone/workdir to free space.
Testing
Test cases are in the subfolder
.test
. They are automatically executed via continuous integration with
Github Actions
.
Code Snippets
9 10 | script: "../scripts/extract-personal-data.py" |
19 20 | shell: "(mkdir -p {output} && lz4 -dc --no-sparse {input} | tar -xf - -C {output}) 2> {log}" |
32 33 | shell: '(unzip "{input}" -d "{output}") > "{log}" 2>&1' |
48 49 | shell: "(mkdir -p {output} && cp -r {params.in_dir}/* {output}) 2> {log}" |
59 60 | script: "../scripts/scan_decomp_data.py" |
74 75 | script: "../scripts/fix_filenames.py" |
9 10 | script: "../scripts/summarize-found-personal-data.py" |
23 24 | script: "../scripts/create-paths-for-manually-checking.py" |
39 40 | shell: "(cp '{input}' '{output}') 2> '{log}'" |
55 56 | shell: "(cp '{input}' '{output}') 2> '{log}'" |
71 72 | shell: "(cp '{input}' '{output}') 2> '{log}'" |
86 87 | shell: "(cp '{input}' '{output}') 2> '{log}'" |
115 116 | shell: "(rm {params.escaped_input}) 2> {log}" |
127 128 | script: "../scripts/summarize-manuel-checks.py" |
144 145 | script: "../scripts/plot-manuel-check-summary.py" |
10 11 | script: "../scripts/preprocess-page.py" |
28 29 | script: "../scripts/identify-personal-data.py" |
44 45 | script: "../scripts/redact-page.py" |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | sys.stderr = open(snakemake.log[0], "w") from os import sep import pandas as pd def save_df(df: pd.DataFrame, out_path: str): df.to_csv(out_path, sep="\t", index=False, header=False) def no_redaction(summary_df: pd.DataFrame, out_path: str): save_df(summary_df[summary_df["# personal data"] == 0][["processed img"]], out_path) def high_degree_of_redaction(summary_df: pd.DataFrame, out_path: str): save_df( summary_df[summary_df["# personal data"] >= 10][["processed img"]], out_path ) def partly_found_address(summary_df: pd.DataFrame, out_path: str): if "city" in summary_df.columns and "address" in summary_df.columns: df = summary_df[summary_df["city"] != summary_df["address"]][["processed img"]] else: df = pd.DataFrame(columns=["processed img"]) save_df(df, out_path) def partly_found_name(summary_df: pd.DataFrame, out_path: str): if "name_family" in summary_df.columns and "name_first_0" in summary_df.columns: df = summary_df[summary_df["name_family"] != summary_df["name_first_0"]][ ["processed img"] ] else: df = pd.DataFrame(columns=["processed img"]) save_df(df, out_path) if __name__ == "__main__": summary_df = pd.read_csv(snakemake.input[0], sep="\t") summary_df.fillna(999999999.0, inplace=True) no_redaction(summary_df, snakemake.output.no_redaction) high_degree_of_redaction(summary_df, snakemake.output.high_degree_of_redaction) partly_found_address(summary_df, snakemake.output.partly_found_address) partly_found_name(summary_df, snakemake.output.partly_found_name) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | sys.stderr = open(snakemake.log[0], "w") import json from collections import defaultdict import sys import itertools def parse_meta_data(json_path: str) -> defaultdict: """Parses the FHIR metadata and extracts personal data. The extracted data is redacted in the further course of the workflow. Args: json_path (str): path to FHIR metadata Returns: defaultdict: personal data of the patient """ with open(json_path) as json_file: data = json.load(json_file) # select the patient resource from the bundel data export for ele in data.get("entry", {}): # iterate of entries for key, value in ele.get("resource", {}).items(): if key == "resourceType" and value == "Patient": data = ele.get("resource") break # TODO design this part more flexible, maybe via the snakemake config file # --------------------------------------- personal_data = defaultdict() first_name_count = 0 for i, first_name in enumerate(data.get("name")[0].get("given")): first_name_count += 1 personal_data["name_first_{}".format(i)] = first_name personal_data["name_family"] = data.get("name")[0].get("family") personal_data["birthDate"] = data.get("birthDate") personal_data["address"] = data.get("address")[0].get("line")[0] personal_data["city"] = " ".join( [data.get("address")[0].get("postalCode"), data.get("address")[0].get("city")] ) personal_data["case_number"] = json_path.split("/")[-1].split(".")[0] for com in data.get("telecom", {}): com_type = com.get("system", {}) personal_data[com_type] = com.get("value", {}) # personal_data["gender"] = data.get("gender") # personal_data["country"] = data.get("address")[0].get("country") # --------------------------------------- return personal_data, first_name_count def variate_personal_data(personal_data: dict, first_name_count: int) -> defaultdict: # permutate names names_simple = set((personal_data["name_first_0"], personal_data["name_family"])) names_all = set() for i in range(first_name_count): names_all.add(personal_data["name_first_{}".format(i)]) names_all.add(personal_data["name_family"]) name_perms = list(itertools.permutations(list(names_simple))) if names_simple != names_all: names_all_perm = list(itertools.permutations(list(names_all))) name_perms.extend(names_all_perm) for i, perm in enumerate(name_perms): personal_data[f"name_perm_{i}"] = ",".join(perm) # variate phone number provider_local_codes = [ "01511", "01512", "01514", "01515", "01516", "01517", "01520", "01522", "01523", "01525", "015566", "01570", "01573", "01575", "01577", "01578", "01590", "0160", "0162", "0163", "0170", "0171", "0172", "0173", "0174", "0175", "0176", "0177", "0178", "0179", ] # it would be much better to generate this list only once centrally instead for every patient sample again with open("resources/Vorwahlen_Festnetz_Bundesnetzagentur.csv", "r") as local_codes: for line in local_codes: if line.startswith("Ortsnetzkennzahl"): pass else: provider_local_codes.append("0" + line.split(";")[0]) nums = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "0"] tmp_phone = personal_data.get("phone", "") for letter in tmp_phone: if letter not in nums: tmp_phone = personal_data["phone"].replace(letter, "") personal_data["phone_perm0"] = tmp_phone for code in provider_local_codes: if tmp_phone.startswith(code): pre_code = code break else: pre_code = tmp_phone[:4] seperators = ["/", "\\", "-", " ", "_", ".", ":"] for i, sep in enumerate(seperators): personal_data[f"phone_perm{i+1}"] = ( tmp_phone[: len(pre_code) + 1] + sep + tmp_phone[len(pre_code) + 1 :] ) # variate birthdate yr, m, dy = personal_data["birthDate"].split("-") for i, sep in enumerate(seperators): personal_data[f"birthDate_perm{i}"] = f"{dy}{sep}{m}{sep}{yr}" personal_data[f"birthDate_perm{i}"] = f"*{dy}{sep}{m}{sep}{yr}" personal_data[f"birthDate_perm{i}{i}"] = f"{yr}{sep}{m}{sep}{dy}" # variate country return personal_data def add_additional_personal_data(add_json_path: str, personal_data: dict) -> defaultdict: # if an additional data file exist, this data will be added to the personal data json file with open(add_json_path) as json_file: additional_data = json.load(json_file) personal_data.update(additional_data) return personal_data def save_personal_data(personal_data: dict, out_path: str): """Save the final dic with the personal data as json. Args: personal_data (dict): dict with the personal data, that is to be removed out_path (str): path to save the json to """ with open(out_path, "w") as fp: json.dump(personal_data, fp, indent=2) if __name__ == "__main__": personal_data = parse_meta_data(snakemake.input[0]) var_data = variate_personal_data(personal_data[0], personal_data[1]) # TODO enrich the personal data. Other examples below # if personal_data.get("birthDate"): # personal_data = format_birthday(personal_data) # if personal_data.get("gender"): # personal_data = format_gender(personal_data) # if personal_data.get("country"): # personal_data = format_country(personal_data) # personal_data = {key: value.lower().strip() for key, value in personal_data.items()} var_data = {key: value.lower().strip() for key, value in var_data.items()} if len(snakemake.input) > 1: add_data = add_additional_personal_data(snakemake.input[1], var_data) save_personal_data(add_data, snakemake.output[0]) else: save_personal_data(var_data, snakemake.output[0]) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | import os import sys import filetype import pdf2image def add_ext(paths_file: str, fixed_paths: str): with open(paths_file, "r") as path_list: with open(fixed_paths, "w") as new_paths: cwd = os.getcwd() accepted_ext = ["jpg", "jpeg", "tiff", "tif", "bmp"] ext_pairs = [{"jpg", "jpeg"}, {"tiff", "tif"}] for path in path_list: path = path.strip() filedir, filename = os.path.split(path) ext = filename.split(".")[-1] ftype = filetype.guess(path).extension if ext != ftype: # file won´t be written to file-list for further processing if ftype is None: print(f"file {path} is in an incompatible file format.") # convert pdf elif ftype == "pdf": print("pdf") pages = pdf2image.convert_from_path(path) for i, page in enumerate(pages): new_paths.write(path + f"_{i}.tif\n") os.chdir(os.path.join(cwd, filedir)) for i, page in enumerate(pages): page.save(f"{filename}_{i}.tif", "TIFF") os.chdir(cwd) os.remove(path) # change the img file type elif ext not in accepted_ext and ftype in accepted_ext: print("change extension") new_paths.write(path + f".{ftype}\n") os.chdir(os.path.join(cwd, filedir)) os.rename(filename, filename + f".{ftype}") os.chdir(cwd) # This elif clause allows to leave files with alternative but adequate extension untouched elif set((ext, ftype)) in ext_pairs: print("set((ext, ftype)) in ext_pairs") new_paths.write(path + "\n") # This elif clause allows to specify what shall happen to the file in question. # This will become relevant if we are going to allow non-img file-types like pdf. # In this case the file needs to be channeled into another branch of the workflow. elif ftype == "None" and ext in accepted_ext: print(f"file {path} is not an image file.") # file extension equals equals the detected extension else: print("else") new_paths.write(path + "\n") if __name__ == "__main__": sys.stderr = open(snakemake.log[0], "w") add_ext(snakemake.input.files, snakemake.output[0]) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 | from os import replace from re import split import typing import json import cv2 import Levenshtein import pandas as pd import pytesseract from pytesseract import Output def parse_page( image_path: str, out_path_all_text: str, out_path_personal_data: str, out_path_non_personal_data: str, personal_data_path: dict, replacements_path:str, min_conf: float = 0.6, max_dist: int = 2, ): """Analyzes the passed image and identifies personal information on it. Args: image_path (str): path to the image out_path_all_text (str): path where all text should be written to out_path_personal_data (str): path where personal data should be written to out_path_non_personal_data (str): path where non personal data should be written to personal_data (dict): path to personal data that should be made unrecognizable replacements: Path to replacement json min_conf (float, optional): minimal OCR confidence score. Defaults to 0.6. max_dist (int, optional): maximum Levenshtein distance of the found text on the image to the personal data. Defaults to 2. """ img = cv2.imread(image_path) with open(personal_data_path) as json_file: personal_data = json.load(json_file) all_text = detect_text(img, min_conf) all_text.reset_index(inplace=True) all_text.to_csv(out_path_all_text, index=False, sep="\t") personal_text = select_personal_data(all_text, personal_data, max_dist) replace_and_save_personal_text(personal_text=personal_text, all_text=all_text, out_path_non_personal_data=out_path_non_personal_data, replacements_path=replacements_path, max_dist=max_dist) personal_text.drop(columns=["index"], inplace=True) personal_text.to_csv(out_path_personal_data, index=False, sep="\t") def replace_and_save_personal_text(personal_text:pd.DataFrame, all_text:pd.DataFrame, out_path_non_personal_data:str, replacements_path:str, max_dist: int = 2): """Replaces and saves text on page. Args: personal_text (pd.DataFrame): DataFrame with all detected text all_text (pd.DataFrame): DataFrame with identifed personal out_path_non_personal_data (str): Path to write replaced text to. """ personal_text = personal_text.copy() # remove personal data indices_to_remove = [ ele.split(",") for ele in personal_text["index"].astype(str).values ] indices_to_remove = [ int(float(item)) for sublist in indices_to_remove for item in sublist ] non_personal_text = all_text[~all_text["index"].isin(indices_to_remove)] # extract reason non_distance_columns=["index", "left", "top", "width", "height", "conf", "text"] distance_columns = list(set(personal_text.columns) - set(non_distance_columns)) personal_text["reason"] = "" for col in distance_columns: personal_text[col] = personal_text[col].mask(personal_text[col] > float(max_dist)) personal_text[col] = personal_text[col].mask(personal_text[col] <= float(max_dist), col) personal_text["reason"] = personal_text["reason"] + personal_text[col].fillna("") personal_text = personal_text[["index", "reason"]] # insert replacements with open(replacements_path) as json_file: replacements = json.load(json_file) replaced_text = personal_text.copy().rename(columns={"reason" : "text"}) for key in replacements.keys(): replaced_text["text"][personal_text["reason"].str.contains(key)] = replacements[key] # if no replacement was found, replace identified personal data with "PrivateDataPrivateData" replaced_text["text"][replaced_text["text"] == personal_text["reason"]] == "PrivateDataPrivateData" # append replacements to whol text replaced_text['index'] = replaced_text['index'].astype(str) replaced_text['index'] = [x.split(',') for x in replaced_text['index']] replaced_text = replaced_text.explode("index") replaced_text['index'] = replaced_text['index'].astype(float).astype(int) non_personal_text = non_personal_text.append(replaced_text, ignore_index=True) non_personal_text.sort_values(by=["index"], inplace=True) with open(out_path_non_personal_data, "w") as out_txt: out_txt.write(" ".join(non_personal_text["text"].values)) def detect_text(img: typing.Any, min_conf: float) -> pd.DataFrame: """Recognizes text on the image. Args: img (typing.Any): image with text to be recognized. min_conf (float): minimum OCR Confidence Scores. Returns: pd.DataFrame: all found text on image with text field data, filtered by min_conf. """ # ocr detected_text_df = pytesseract.image_to_data( img, lang="deu", output_type=Output.DATAFRAME ) # filter ocr table detected_text_df = detected_text_df[detected_text_df.conf >= min_conf] detected_text_df.drop( columns=["level", "page_num", "block_num", "par_num", "line_num", "word_num"], inplace=True, ) detected_text_df.text = detected_text_df.text.astype(str) detected_text_df.text = detected_text_df.text.str.lower() return detected_text_df def select_personal_data( detected_text_df: pd.DataFrame, personal_data: dict, max_dist: int ) -> pd.DataFrame: """Identifies personal data from the detected text. Args: detected_text_df (pd.DataFrame): detected text on the image. personal_data (dict): personal data to be masked out max_dist (int): maximum Levenshtein distance of the found text on the image to the personal data. Returns: pd.DataFrame: person data with location on image, filtered by max_dist. """ final_df = pd.DataFrame() max_spaces = max([value.count(" ") for value in personal_data.values()]) for no_spaces in range(max_spaces + 1): tmp_df = detected_text_df.copy() tmp_df.rename( columns={"text": "text_0", "width": "width_0", "index": "index_0"}, inplace=True, ) # subset of personal data dict, according to # spaces tmp_dict = { key: value for key, value in personal_data.items() if value.count(" ") == no_spaces } # shift text to get longer phrases if no_spaces > 0: shift_colums = [] for shift in range(1, no_spaces + 1): # shift index column and aggreagte shift_colums.append("index_" + str(shift)) highest_index_column_name = "index_" + str(shift) tmp_df[highest_index_column_name] = ( tmp_df["index_" + str(shift - 1)].astype(str) + "," + tmp_df.index_0.shift(-shift).fillna("").astype(str) ) # shift text column and aggreagte shift_colums.append("text_" + str(shift)) highest_text_column_name = "text_" + str(shift) tmp_df[highest_text_column_name] = ( tmp_df["text_" + str(shift - 1)] + " " + tmp_df.text_0.shift(-shift).fillna("") ) # shift width column and aggreagte shift_colums.append("width_" + str(shift)) highest_width_column_name = "width_" + str(shift) tmp_df[highest_width_column_name] = ( tmp_df["width_" + str(shift - 1)] + tmp_df.width_0.shift(-shift).fillna(0) + tmp_df.width_0 / tmp_df.text_0.str.len() ) tmp_df["index_0"] = tmp_df[highest_index_column_name] tmp_df["text_0"] = tmp_df[highest_text_column_name] tmp_df["width_0"] = tmp_df[highest_width_column_name].astype(int) tmp_df.drop(columns=shift_colums, inplace=True) tmp_df.rename( columns={"text_0": "text", "width_0": "width", "index_0": "index"}, inplace=True, ) # calc edit distances for each key in subsampled dict for key, value in tmp_dict.items(): tmp_df[key] = tmp_df["text"].apply( lambda text: Levenshtein.distance(value, text) ) # select entries, where distance is below or equal a threshhold query_list = [] for col in tmp_dict.keys(): query_list.append(f"{col}<={max_dist}") # if query list is not empty, thus personal data was found, # then append filtered df if query_list: tmp_df = tmp_df.query(" | ".join(query_list)) final_df = final_df.append(tmp_df, ignore_index=True) return final_df if __name__ == "__main__": sys.stderr = open(snakemake.log[0], "w") parse_page( image_path=snakemake.input.preprocessed_page, out_path_all_text=snakemake.output.all_text, out_path_personal_data=snakemake.output.text_to_redact, personal_data_path=snakemake.input.personal_data, out_path_non_personal_data=snakemake.output.non_personal_data, min_conf=snakemake.config["min-confidence"], max_dist=snakemake.config["max-distance"], replacements_path=snakemake.params["replacements"] ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | sys.stderr = open(snakemake.log[0], "w") # parameter = snakemake.params.get("parameter", "") import altair as alt import pandas as pd def plot_manuel_check_summary(path_to_manuel_check_summary: str, out_path: str): source = pd.read_csv(path_to_manuel_check_summary, sep="\t") bars = ( alt.Chart(source) .mark_bar() .encode( x="Count:Q", y="Check:O", ) ) text = bars.mark_text( align="left", baseline="middle", dx=3, # Nudges text to right so it doesn't appear on top of the bar ).encode(text="Count:Q") (bars + text).save(out_path) if __name__ == "__main__": plot_manuel_check_summary(snakemake.input[0], snakemake.output[0]) |
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 | import cv2 import numpy as np # get grayscale image def get_grayscale(image): return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # noise removal def remove_noise(image): return cv2.medianBlur(image, 5) # thresholding def thresholding(image): return cv2.threshold(image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1] # opening - erosion followed by dilation def opening(image): kernel = np.ones((5, 5), np.uint8) return cv2.morphologyEx(image, cv2.MORPH_OPEN, kernel) # canny edge detection def canny(image): return cv2.Canny(image, 100, 200) # dilation def dilate(image): kernel = np.ones((5, 5), np.uint8) return cv2.dilate(image, kernel, iterations=1) # erosion def erode(image): kernel = np.ones((5, 5), np.uint8) return cv2.erode(image, kernel, iterations=1) # template matching def match_template(image, template): return cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED) if __name__ == "__main__": sys.stderr = open(snakemake.log[0], "w") image = cv2.imread(snakemake.input[0]) # TODO Check which preprocessing techniques deliver the best results processed_image = get_grayscale(image) # processed_image = remove_noise(processed_image) processed_image = thresholding(processed_image) # processed_image = opening(processed_image) # processed_image = canny(processed_image) # TODO add deskewing # image = deskew(image) cv2.imwrite(snakemake.output[0], processed_image) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 | import typing import cv2 import pandas as pd def process_page(image_path: str, out_path: str, data_to_redact: str, version: str): """Analyzes the passed image and removes personal information. Args: image_path (str): path to the image out_path (str): path where the redacted image should be written to version (str): version number of the workflow """ df = pd.read_csv(data_to_redact, sep="\t") img = cv2.imread(image_path) img = add_watermark(img, version) img = redact(df, img) if not ".jpg" in out_path[-3:]: "".join([out_path, ".jpg"]) cv2.imwrite(out_path, img) def add_watermark(img: typing.Any, version: str) -> typing.Any: x, y = 50, 50 watermark_text = "anonymized by DocNo {}".format(version) cv2.putText(img, watermark_text, (x, y), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2) return img def redact(personal_data_df: pd.DataFrame, img: typing.Any) -> typing.Any: """Redacts personal data. Args: personal_data_df (pd.DataFrame): personal data with location on image. img (typing.Any): image with personal data on it. Returns: typing.Any: redacted image. """ for i in personal_data_df.index: (x, y, w, h) = ( int(personal_data_df.loc[i].left), int(personal_data_df.loc[i].top), int(personal_data_df.loc[i].width), int(personal_data_df.loc[i].height), ) img = cv2.rectangle(img, (x, y), (x + w, y + h), (0, 0, 0), -1) return img if __name__ == "__main__": sys.stderr = open(snakemake.log[0], "w") version = snakemake.params.get("version", "") process_page( image_path=snakemake.input.orginal_page, out_path=snakemake.output[0], data_to_redact=snakemake.input.data_to_redact, version=version, ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | import os import sys import typing def scan_folder(subfolder_path: str, writeable_file_object: typing.TextIO): ignore = [".snakemake_timestamp", ".DS_Store"] for entry in os.scandir(subfolder_path): if entry.is_dir(follow_symlinks=False): scan_folder(entry.path, writeable_file_object) elif entry.is_file() and not any( ignore_element in entry.path for ignore_element in ignore ): writeable_file_object.write(f"{entry.path}\n") else: pass def recursive_folder_scan(decomp_data_dir: str, results_csv_paths: str): with open(results_csv_paths, "w") as paths_csv: scan_folder(decomp_data_dir, paths_csv) if __name__ == "__main__": sys.stderr = open(snakemake.log[0], "w") recursive_folder_scan(snakemake.input[0], snakemake.output[0]) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 | sys.stderr = open(snakemake.log[0], "w") from collections import defaultdict import pandas as pd def summarize_found_personal_data( data_path_list: list[str], img_path_list: list[str], sm_output: str, max_dist: int ): """Summarizes the found personal data. Saves the summary as a tsv-file. Args: data_path_list (list[str]): Paths of identify personal data. img_path_list (list[str]): Paths to redacted images. sm_output (str): Path to write summary to. max_dist (int): Mixmal Levenshtein distance. """ summary_list = [] for data_path, img_path in zip(data_path_list, img_path_list): found_data_df = pd.read_csv(data_path, sep="\t") page_summary = defaultdict() page_summary["processed img"] = img_path page_summary["# personal data"] = found_data_df.shape[0] tesseract_output = {"left", "top", "width", "height", "conf", "text"} personal_data_columns = set(found_data_df.columns) - tesseract_output for column in personal_data_columns: no_found_data = found_data_df[found_data_df[column] <= max_dist][ column ].shape[0] if no_found_data > 0: page_summary[column] = no_found_data summary_list.append(page_summary) pd.DataFrame(summary_list).to_csv(sm_output, index=False, sep="\t") if __name__ == "__main__": summarize_found_personal_data( data_path_list=snakemake.input.data, img_path_list=snakemake.input.pages, sm_output=snakemake.output[0], max_dist=snakemake.config["max-distance"], ) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 | sys.stderr = open(snakemake.log[0], "w") # parameter = snakemake.params.get("parameter", "") from collections import defaultdict from os.path import basename, splitext import pandas as pd def summarize_manuel_checks( paths_to_manuell_check_files: list[str], path_to_total_summary: str, out_path: str ): summary_dict = defaultdict() summary_dict["total pages processed"] = pd.read_csv( path_to_total_summary, sep="\t" ).shape[0] for path in paths_to_manuell_check_files: header = splitext(basename(path))[0].replace("_", " ").replace("-", " ") count = pd.read_csv(path, sep="\t", names=[header]).shape[0] summary_dict[header] = count manuel_check_summary_df = pd.DataFrame( summary_dict.items(), columns=["Check", "Count"] ) manuel_check_summary_df.to_csv(out_path, sep="\t", index=False) if __name__ == "__main__": summarize_manuel_checks( snakemake.input.manuel_checks, snakemake.input.total_imgs_processed, snakemake.output[0], ) |
Support
- Future updates
Related Workflows





