Personal data redaction on images based on FHIR patient resources.

public public 1yr ago Version: latest 0 bookmarks

This workflow redacted personal information on given images. The personal information must be provided as FHIR patient resource .

Authors

  • Thomas Battenfeld (@thomasbtf)

  • Simon Magin (@simakro)

  • Josefa Welling (@josefawelling)

  • Christin Seifert

  • Folker Meyer (@folker)

Usage

Step 1: Obtain a copy of this workflow

If you simply want to use this workflow, download and extract the latest release . If you intend to modify and further extend this workflow or want to work under version control, fork this repository as outlined in Advanced . The latter way is recommended.

In any case, if you use this workflow in a paper, don't forget to give credits to the authors by citing the URL of this repository and, if available, its DOI (see above).

Step 2: Configure workflow

Configure the workflow according to your needs by editing the files in the config/ folder. Adjust the config/config.yaml to configure the workflow execution, and the config/pep/documents.csv to specify your documents and meta data.

Step 3: Install Snakemake

Install Snakemake using conda :

conda create -c bioconda -c conda-forge -n snakemake snakemake

For installation details, see the instructions in the Snakemake documentation .

Step 4: Execute workflow

Activate the conda environment:

conda activate snakemake

Test your configuration by performing a dry-run via

snakemake --use-conda -n

Then execute the workflow with $N cores via

snakemake --use-conda --cores $N

If you not only want to fix the software stack but also the underlying OS, use

snakemake --use-conda --use-singularity

in combination with the modes above. See the Snakemake documentation for further details.

Step 5: Investigate results

After successful execution, you can create a self-contained interactive HTML report with all results via:

snakemake --report report.zip

This report can, e.g., be forwarded to your collaborators. An example (using some trivial test data) can be seen here .

Advanced

The following recipe provides established best practices for running and extending this workflow in a reproducible way.

  1. Fork the repo to a personal or lab account.

  2. Clone the fork to the desired working directory for the concrete project/run on your machine.

  3. Create a new branch (the project-branch) within the clone and switch to it. The branch will contain any project-specific modifications (e.g. to configuration, but also to code).

  4. Modify the config, and any necessary sheets (and probably the workflow) as needed.

  5. Commit any changes and push the project-branch to your fork on github.

  6. Run the analysis.

  7. Optional: Merge back any valuable and generalizable changes to the upstream repo via a pull request . This would be greatly appreciated .

  8. Optional: Push results (plots/tables) to the remote branch on your fork.

  9. Optional: Create a self-contained workflow archive for publication along with the paper (snakemake --archive).

  10. Optional: Delete the local clone/workdir to free space.

Testing

Test cases are in the subfolder .test . They are automatically executed via continuous integration with Github Actions .

Code Snippets

 9
10
script:
    "../scripts/extract-personal-data.py"
19
20
shell:
    "(mkdir -p {output} && lz4 -dc --no-sparse {input} | tar -xf - -C {output}) 2> {log}"
32
33
shell:
    '(unzip "{input}" -d "{output}") > "{log}" 2>&1'
48
49
shell:
    "(mkdir -p {output} && cp -r  {params.in_dir}/* {output}) 2> {log}"
59
60
script:
    "../scripts/scan_decomp_data.py"
74
75
script:
    "../scripts/fix_filenames.py"
 9
10
script:
    "../scripts/summarize-found-personal-data.py"
23
24
script:
    "../scripts/create-paths-for-manually-checking.py"
39
40
shell:
    "(cp '{input}' '{output}') 2> '{log}'"
55
56
shell:
    "(cp '{input}' '{output}') 2> '{log}'"
71
72
shell:
    "(cp '{input}' '{output}') 2> '{log}'"
86
87
shell:
    "(cp '{input}' '{output}') 2> '{log}'"
115
116
shell:
    "(rm {params.escaped_input}) 2> {log}"
127
128
script:
    "../scripts/summarize-manuel-checks.py"
144
145
script:
    "../scripts/plot-manuel-check-summary.py"
10
11
script:
    "../scripts/preprocess-page.py"
28
29
script:
    "../scripts/identify-personal-data.py"
44
45
script:
    "../scripts/redact-page.py"
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
sys.stderr = open(snakemake.log[0], "w")

from os import sep
import pandas as pd


def save_df(df: pd.DataFrame, out_path: str):
    df.to_csv(out_path, sep="\t", index=False, header=False)


def no_redaction(summary_df: pd.DataFrame, out_path: str):
    save_df(summary_df[summary_df["# personal data"] == 0][["processed img"]], out_path)


def high_degree_of_redaction(summary_df: pd.DataFrame, out_path: str):
    save_df(
        summary_df[summary_df["# personal data"] >= 10][["processed img"]], out_path
    )


def partly_found_address(summary_df: pd.DataFrame, out_path: str):
    if "city" in summary_df.columns and "address" in summary_df.columns:
        df = summary_df[summary_df["city"] != summary_df["address"]][["processed img"]]
    else:
        df = pd.DataFrame(columns=["processed img"])

    save_df(df, out_path)


def partly_found_name(summary_df: pd.DataFrame, out_path: str):
    if "name_family" in summary_df.columns and "name_first_0" in summary_df.columns:
        df = summary_df[summary_df["name_family"] != summary_df["name_first_0"]][
            ["processed img"]
        ]
    else:
        df = pd.DataFrame(columns=["processed img"])

    save_df(df, out_path)


if __name__ == "__main__":
    summary_df = pd.read_csv(snakemake.input[0], sep="\t")
    summary_df.fillna(999999999.0, inplace=True)
    no_redaction(summary_df, snakemake.output.no_redaction)
    high_degree_of_redaction(summary_df, snakemake.output.high_degree_of_redaction)
    partly_found_address(summary_df, snakemake.output.partly_found_address)
    partly_found_name(summary_df, snakemake.output.partly_found_name)
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
sys.stderr = open(snakemake.log[0], "w")

import json
from collections import defaultdict
import sys
import itertools


def parse_meta_data(json_path: str) -> defaultdict:
    """Parses the FHIR metadata and extracts personal data.
    The extracted data is redacted in the further course of the workflow.

    Args:
        json_path (str): path to FHIR metadata

    Returns:
        defaultdict: personal data of the patient
    """

    with open(json_path) as json_file:
        data = json.load(json_file)

    # select the patient resource from the bundel data export
    for ele in data.get("entry", {}):
        # iterate of entries
        for key, value in ele.get("resource", {}).items():
            if key == "resourceType" and value == "Patient":
                data = ele.get("resource")
                break

    # TODO design this part more flexible, maybe via the snakemake config file
    # ---------------------------------------
    personal_data = defaultdict()
    first_name_count = 0
    for i, first_name in enumerate(data.get("name")[0].get("given")):
        first_name_count += 1
        personal_data["name_first_{}".format(i)] = first_name
    personal_data["name_family"] = data.get("name")[0].get("family")
    personal_data["birthDate"] = data.get("birthDate")
    personal_data["address"] = data.get("address")[0].get("line")[0]
    personal_data["city"] = " ".join(
        [data.get("address")[0].get("postalCode"), data.get("address")[0].get("city")]
    )
    personal_data["case_number"] = json_path.split("/")[-1].split(".")[0]
    for com in data.get("telecom", {}):
        com_type = com.get("system", {})
        personal_data[com_type] = com.get("value", {})
    # personal_data["gender"] = data.get("gender")
    # personal_data["country"] = data.get("address")[0].get("country")
    # ---------------------------------------
    return personal_data, first_name_count


def variate_personal_data(personal_data: dict, first_name_count: int) -> defaultdict:
    # permutate names
    names_simple = set((personal_data["name_first_0"], personal_data["name_family"]))
    names_all = set()
    for i in range(first_name_count):
        names_all.add(personal_data["name_first_{}".format(i)])
    names_all.add(personal_data["name_family"])

    name_perms = list(itertools.permutations(list(names_simple)))
    if names_simple != names_all:
        names_all_perm = list(itertools.permutations(list(names_all)))
        name_perms.extend(names_all_perm)

    for i, perm in enumerate(name_perms):
        personal_data[f"name_perm_{i}"] = ",".join(perm)

    # variate phone number
    provider_local_codes = [
        "01511",
        "01512",
        "01514",
        "01515",
        "01516",
        "01517",
        "01520",
        "01522",
        "01523",
        "01525",
        "015566",
        "01570",
        "01573",
        "01575",
        "01577",
        "01578",
        "01590",
        "0160",
        "0162",
        "0163",
        "0170",
        "0171",
        "0172",
        "0173",
        "0174",
        "0175",
        "0176",
        "0177",
        "0178",
        "0179",
    ]

    # it would be much better to generate this list only once centrally instead for every patient sample again
    with open("resources/Vorwahlen_Festnetz_Bundesnetzagentur.csv", "r") as local_codes:
        for line in local_codes:
            if line.startswith("Ortsnetzkennzahl"):
                pass
            else:
                provider_local_codes.append("0" + line.split(";")[0])

    nums = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "0"]
    tmp_phone = personal_data.get("phone", "")
    for letter in tmp_phone:
        if letter not in nums:
            tmp_phone = personal_data["phone"].replace(letter, "")
    personal_data["phone_perm0"] = tmp_phone

    for code in provider_local_codes:
        if tmp_phone.startswith(code):
            pre_code = code
            break
        else:
            pre_code = tmp_phone[:4]

    seperators = ["/", "\\", "-", " ", "_", ".", ":"]

    for i, sep in enumerate(seperators):
        personal_data[f"phone_perm{i+1}"] = (
            tmp_phone[: len(pre_code) + 1] + sep + tmp_phone[len(pre_code) + 1 :]
        )

    # variate birthdate
    yr, m, dy = personal_data["birthDate"].split("-")
    for i, sep in enumerate(seperators):
        personal_data[f"birthDate_perm{i}"] = f"{dy}{sep}{m}{sep}{yr}"
        personal_data[f"birthDate_perm{i}"] = f"*{dy}{sep}{m}{sep}{yr}"
        personal_data[f"birthDate_perm{i}{i}"] = f"{yr}{sep}{m}{sep}{dy}"

    # variate country

    return personal_data

def add_additional_personal_data(add_json_path: str, personal_data: dict) -> defaultdict:
    # if an additional data file exist, this data will be added to the personal data json file
    with open(add_json_path) as json_file:
        additional_data = json.load(json_file)
    personal_data.update(additional_data)

    return personal_data

def save_personal_data(personal_data: dict, out_path: str):
    """Save the final dic with the personal data as json.

    Args:
        personal_data (dict): dict with the personal data, that is to be removed
        out_path (str): path to save the json to
    """

    with open(out_path, "w") as fp:
        json.dump(personal_data, fp, indent=2)


if __name__ == "__main__":
    personal_data = parse_meta_data(snakemake.input[0])
    var_data = variate_personal_data(personal_data[0], personal_data[1])
    # TODO enrich the personal data. Other examples below
    # if personal_data.get("birthDate"):
    #     personal_data = format_birthday(personal_data)

    # if personal_data.get("gender"):
    #     personal_data = format_gender(personal_data)

    # if personal_data.get("country"):
    #     personal_data = format_country(personal_data)

    # personal_data = {key: value.lower().strip() for key, value in personal_data.items()}
    var_data = {key: value.lower().strip() for key, value in var_data.items()}

    if len(snakemake.input) > 1:
        add_data = add_additional_personal_data(snakemake.input[1], var_data)
        save_personal_data(add_data, snakemake.output[0])
    else:
        save_personal_data(var_data, snakemake.output[0])
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import os
import sys
import filetype
import pdf2image


def add_ext(paths_file: str, fixed_paths: str):
    with open(paths_file, "r") as path_list:
        with open(fixed_paths, "w") as new_paths:
            cwd = os.getcwd()
            accepted_ext = ["jpg", "jpeg", "tiff", "tif", "bmp"]
            ext_pairs = [{"jpg", "jpeg"}, {"tiff", "tif"}]

            for path in path_list:
                path = path.strip()
                filedir, filename = os.path.split(path)
                ext = filename.split(".")[-1]
                ftype = filetype.guess(path).extension

                if ext != ftype:
                    #  file won´t be written to file-list for further processing
                    if ftype is None:
                        print(f"file {path} is in an incompatible file format.")

                    # convert pdf
                    elif ftype == "pdf":
                        print("pdf")
                        pages = pdf2image.convert_from_path(path)
                        for i, page in enumerate(pages):
                            new_paths.write(path + f"_{i}.tif\n")
                        os.chdir(os.path.join(cwd, filedir))
                        for i, page in enumerate(pages):
                            page.save(f"{filename}_{i}.tif", "TIFF")
                        os.chdir(cwd)
                        os.remove(path)

                    # change the img file type
                    elif ext not in accepted_ext and ftype in accepted_ext:
                        print("change extension")
                        new_paths.write(path + f".{ftype}\n")
                        os.chdir(os.path.join(cwd, filedir))
                        os.rename(filename, filename + f".{ftype}")
                        os.chdir(cwd)

                    # This elif clause allows to leave files with alternative but adequate extension untouched
                    elif set((ext, ftype)) in ext_pairs:
                        print("set((ext, ftype)) in ext_pairs")
                        new_paths.write(path + "\n")

                    # This elif clause allows to specify what shall happen to the file in question.
                    # This will become relevant if we are going to allow non-img file-types like pdf.
                    # In this case the file needs to be channeled into another branch of the workflow.
                    elif ftype == "None" and ext in accepted_ext:
                        print(f"file {path} is not an image file.")

                # file extension equals equals the detected extension
                else:
                    print("else")
                    new_paths.write(path + "\n")


if __name__ == "__main__":
    sys.stderr = open(snakemake.log[0], "w")
    add_ext(snakemake.input.files, snakemake.output[0])
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
from os import replace
from re import split
import typing
import json

import cv2
import Levenshtein
import pandas as pd
import pytesseract
from pytesseract import Output


def parse_page(
    image_path: str,
    out_path_all_text: str,
    out_path_personal_data: str,
    out_path_non_personal_data: str,
    personal_data_path: dict,
    replacements_path:str,
    min_conf: float = 0.6,
    max_dist: int = 2,
):
    """Analyzes the passed image and identifies personal information on it.

    Args:
        image_path (str): path to the image
        out_path_all_text (str): path where all text should be written to
        out_path_personal_data (str): path where personal data should be written to
        out_path_non_personal_data (str): path where non personal data should be written to
        personal_data (dict): path to personal data that should be made unrecognizable
        replacements: Path to replacement json
        min_conf (float, optional): minimal OCR confidence score. Defaults to 0.6.
        max_dist (int, optional): maximum Levenshtein distance of the found text on the image to the personal data. Defaults to 2.
    """

    img = cv2.imread(image_path)

    with open(personal_data_path) as json_file:
        personal_data = json.load(json_file)

    all_text = detect_text(img, min_conf)
    all_text.reset_index(inplace=True)
    all_text.to_csv(out_path_all_text, index=False, sep="\t")

    personal_text = select_personal_data(all_text, personal_data, max_dist)

    replace_and_save_personal_text(personal_text=personal_text, all_text=all_text, out_path_non_personal_data=out_path_non_personal_data, replacements_path=replacements_path, max_dist=max_dist)

    personal_text.drop(columns=["index"], inplace=True)
    personal_text.to_csv(out_path_personal_data, index=False, sep="\t")


def replace_and_save_personal_text(personal_text:pd.DataFrame, all_text:pd.DataFrame, out_path_non_personal_data:str, replacements_path:str, max_dist: int = 2):
    """Replaces and saves text on page.

    Args:
        personal_text (pd.DataFrame): DataFrame with all detected text
        all_text (pd.DataFrame): DataFrame with identifed personal
        out_path_non_personal_data (str): Path to write replaced text to.
    """
    personal_text = personal_text.copy()
    # remove personal data
    indices_to_remove = [
        ele.split(",") for ele in personal_text["index"].astype(str).values
    ]
    indices_to_remove = [
        int(float(item)) for sublist in indices_to_remove for item in sublist
    ]
    non_personal_text = all_text[~all_text["index"].isin(indices_to_remove)]

    # extract reason
    non_distance_columns=["index", "left", "top", "width", "height", "conf", "text"]
    distance_columns = list(set(personal_text.columns) - set(non_distance_columns))


    personal_text["reason"] = ""
    for col in distance_columns:
        personal_text[col] = personal_text[col].mask(personal_text[col] > float(max_dist))
        personal_text[col] = personal_text[col].mask(personal_text[col] <= float(max_dist), col)
        personal_text["reason"] = personal_text["reason"] +  personal_text[col].fillna("")

    personal_text = personal_text[["index", "reason"]]

    # insert replacements
    with open(replacements_path) as json_file:
        replacements = json.load(json_file)

    replaced_text = personal_text.copy().rename(columns={"reason" : "text"})
    for key in replacements.keys():
        replaced_text["text"][personal_text["reason"].str.contains(key)] = replacements[key]

    # if no replacement was found, replace identified personal data with "PrivateDataPrivateData"
    replaced_text["text"][replaced_text["text"] == personal_text["reason"]] == "PrivateDataPrivateData"

    # append replacements to whol text
    replaced_text['index'] = replaced_text['index'].astype(str)
    replaced_text['index'] = [x.split(',') for x in replaced_text['index']]
    replaced_text = replaced_text.explode("index")
    replaced_text['index'] = replaced_text['index'].astype(float).astype(int)
    non_personal_text = non_personal_text.append(replaced_text, ignore_index=True)
    non_personal_text.sort_values(by=["index"], inplace=True)

    with open(out_path_non_personal_data, "w") as out_txt:
        out_txt.write(" ".join(non_personal_text["text"].values))


def detect_text(img: typing.Any, min_conf: float) -> pd.DataFrame:
    """Recognizes text on the image.

    Args:
        img (typing.Any): image with text to be recognized.
        min_conf (float): minimum OCR Confidence Scores.

    Returns:
        pd.DataFrame: all found text on image with text field data, filtered by min_conf.
    """

    # ocr
    detected_text_df = pytesseract.image_to_data(
        img, lang="deu", output_type=Output.DATAFRAME
    )

    # filter ocr table
    detected_text_df = detected_text_df[detected_text_df.conf >= min_conf]
    detected_text_df.drop(
        columns=["level", "page_num", "block_num", "par_num", "line_num", "word_num"],
        inplace=True,
    )

    detected_text_df.text = detected_text_df.text.astype(str)
    detected_text_df.text = detected_text_df.text.str.lower()

    return detected_text_df


def select_personal_data(
    detected_text_df: pd.DataFrame, personal_data: dict, max_dist: int
) -> pd.DataFrame:
    """Identifies personal data from the detected text.

    Args:
        detected_text_df (pd.DataFrame): detected text on the image.
        personal_data (dict): personal data to be masked out
        max_dist (int): maximum Levenshtein distance of the found text on the image to the personal data.

    Returns:
        pd.DataFrame: person data with location on image, filtered by max_dist.
    """
    final_df = pd.DataFrame()

    max_spaces = max([value.count(" ") for value in personal_data.values()])
    for no_spaces in range(max_spaces + 1):
        tmp_df = detected_text_df.copy()
        tmp_df.rename(
            columns={"text": "text_0", "width": "width_0", "index": "index_0"},
            inplace=True,
        )

        # subset of personal data dict, according to # spaces
        tmp_dict = {
            key: value
            for key, value in personal_data.items()
            if value.count(" ") == no_spaces
        }

        # shift text to get longer phrases
        if no_spaces > 0:
            shift_colums = []
            for shift in range(1, no_spaces + 1):
                # shift index column and aggreagte
                shift_colums.append("index_" + str(shift))
                highest_index_column_name = "index_" + str(shift)
                tmp_df[highest_index_column_name] = (
                    tmp_df["index_" + str(shift - 1)].astype(str)
                    + ","
                    + tmp_df.index_0.shift(-shift).fillna("").astype(str)
                )

                # shift text column and aggreagte
                shift_colums.append("text_" + str(shift))
                highest_text_column_name = "text_" + str(shift)
                tmp_df[highest_text_column_name] = (
                    tmp_df["text_" + str(shift - 1)]
                    + " "
                    + tmp_df.text_0.shift(-shift).fillna("")
                )

                # shift width column and aggreagte
                shift_colums.append("width_" + str(shift))
                highest_width_column_name = "width_" + str(shift)
                tmp_df[highest_width_column_name] = (
                    tmp_df["width_" + str(shift - 1)]
                    + tmp_df.width_0.shift(-shift).fillna(0)
                    + tmp_df.width_0 / tmp_df.text_0.str.len()
                )

            tmp_df["index_0"] = tmp_df[highest_index_column_name]
            tmp_df["text_0"] = tmp_df[highest_text_column_name]
            tmp_df["width_0"] = tmp_df[highest_width_column_name].astype(int)
            tmp_df.drop(columns=shift_colums, inplace=True)

        tmp_df.rename(
            columns={"text_0": "text", "width_0": "width", "index_0": "index"},
            inplace=True,
        )

        # calc edit distances for each key in subsampled dict
        for key, value in tmp_dict.items():
            tmp_df[key] = tmp_df["text"].apply(
                lambda text: Levenshtein.distance(value, text)
            )

        # select entries, where distance is below or equal a threshhold
        query_list = []
        for col in tmp_dict.keys():
            query_list.append(f"{col}<={max_dist}")

        # if query list is not empty, thus personal data was found,
        # then append filtered df
        if query_list:
            tmp_df = tmp_df.query(" | ".join(query_list))
            final_df = final_df.append(tmp_df, ignore_index=True)

    return final_df


if __name__ == "__main__":
    sys.stderr = open(snakemake.log[0], "w")
    parse_page(
        image_path=snakemake.input.preprocessed_page,
        out_path_all_text=snakemake.output.all_text,
        out_path_personal_data=snakemake.output.text_to_redact,
        personal_data_path=snakemake.input.personal_data,
        out_path_non_personal_data=snakemake.output.non_personal_data,
        min_conf=snakemake.config["min-confidence"],
        max_dist=snakemake.config["max-distance"],
        replacements_path=snakemake.params["replacements"]
    )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
sys.stderr = open(snakemake.log[0], "w")
# parameter = snakemake.params.get("parameter", "")

import altair as alt
import pandas as pd


def plot_manuel_check_summary(path_to_manuel_check_summary: str, out_path: str):
    source = pd.read_csv(path_to_manuel_check_summary, sep="\t")

    bars = (
        alt.Chart(source)
        .mark_bar()
        .encode(
            x="Count:Q",
            y="Check:O",
        )
    )

    text = bars.mark_text(
        align="left",
        baseline="middle",
        dx=3,  # Nudges text to right so it doesn't appear on top of the bar
    ).encode(text="Count:Q")

    (bars + text).save(out_path)


if __name__ == "__main__":
    plot_manuel_check_summary(snakemake.input[0], snakemake.output[0])
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import cv2
import numpy as np


# get grayscale image
def get_grayscale(image):
    return cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)


# noise removal
def remove_noise(image):
    return cv2.medianBlur(image, 5)


# thresholding
def thresholding(image):
    return cv2.threshold(image, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]


# opening - erosion followed by dilation
def opening(image):
    kernel = np.ones((5, 5), np.uint8)
    return cv2.morphologyEx(image, cv2.MORPH_OPEN, kernel)


# canny edge detection
def canny(image):
    return cv2.Canny(image, 100, 200)


# dilation
def dilate(image):
    kernel = np.ones((5, 5), np.uint8)
    return cv2.dilate(image, kernel, iterations=1)


# erosion
def erode(image):
    kernel = np.ones((5, 5), np.uint8)
    return cv2.erode(image, kernel, iterations=1)


# template matching
def match_template(image, template):
    return cv2.matchTemplate(image, template, cv2.TM_CCOEFF_NORMED)


if __name__ == "__main__":

    sys.stderr = open(snakemake.log[0], "w")

    image = cv2.imread(snakemake.input[0])

    # TODO Check which preprocessing techniques deliver the best results
    processed_image = get_grayscale(image)
    # processed_image = remove_noise(processed_image)
    processed_image = thresholding(processed_image)
    # processed_image = opening(processed_image)
    # processed_image = canny(processed_image)

    # TODO add deskewing
    # image = deskew(image)

    cv2.imwrite(snakemake.output[0], processed_image)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import typing

import cv2
import pandas as pd


def process_page(image_path: str, out_path: str, data_to_redact: str, version: str):
    """Analyzes the passed image and removes personal information.

    Args:
        image_path (str): path to the image
        out_path (str): path where the redacted image should be written to
        version (str): version number of the workflow
    """

    df = pd.read_csv(data_to_redact, sep="\t")
    img = cv2.imread(image_path)

    img = add_watermark(img, version)
    img = redact(df, img)

    if not ".jpg" in out_path[-3:]:
        "".join([out_path, ".jpg"])

    cv2.imwrite(out_path, img)


def add_watermark(img: typing.Any, version: str) -> typing.Any:
    x, y = 50, 50
    watermark_text = "anonymized by DocNo {}".format(version)
    cv2.putText(img, watermark_text, (x, y), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)
    return img


def redact(personal_data_df: pd.DataFrame, img: typing.Any) -> typing.Any:
    """Redacts personal data.

    Args:
        personal_data_df (pd.DataFrame): personal data with location on image.
        img (typing.Any): image with personal data on it.

    Returns:
        typing.Any: redacted image.
    """

    for i in personal_data_df.index:
        (x, y, w, h) = (
            int(personal_data_df.loc[i].left),
            int(personal_data_df.loc[i].top),
            int(personal_data_df.loc[i].width),
            int(personal_data_df.loc[i].height),
        )
        img = cv2.rectangle(img, (x, y), (x + w, y + h), (0, 0, 0), -1)
    return img


if __name__ == "__main__":
    sys.stderr = open(snakemake.log[0], "w")
    version = snakemake.params.get("version", "")
    process_page(
        image_path=snakemake.input.orginal_page,
        out_path=snakemake.output[0],
        data_to_redact=snakemake.input.data_to_redact,
        version=version,
    )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import os
import sys
import typing


def scan_folder(subfolder_path: str, writeable_file_object: typing.TextIO):
    ignore = [".snakemake_timestamp", ".DS_Store"]

    for entry in os.scandir(subfolder_path):
        if entry.is_dir(follow_symlinks=False):
            scan_folder(entry.path, writeable_file_object)
        elif entry.is_file() and not any(
            ignore_element in entry.path for ignore_element in ignore
        ):
            writeable_file_object.write(f"{entry.path}\n")
        else:
            pass


def recursive_folder_scan(decomp_data_dir: str, results_csv_paths: str):
    with open(results_csv_paths, "w") as paths_csv:
        scan_folder(decomp_data_dir, paths_csv)


if __name__ == "__main__":
    sys.stderr = open(snakemake.log[0], "w")
    recursive_folder_scan(snakemake.input[0], snakemake.output[0])
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
sys.stderr = open(snakemake.log[0], "w")

from collections import defaultdict
import pandas as pd


def summarize_found_personal_data(
    data_path_list: list[str], img_path_list: list[str], sm_output: str, max_dist: int
):
    """Summarizes the found personal data. Saves the summary as a tsv-file.

    Args:
        data_path_list (list[str]): Paths of identify personal data.
        img_path_list (list[str]): Paths to redacted images.
        sm_output (str): Path to write summary to.
        max_dist (int): Mixmal Levenshtein distance.
    """
    summary_list = []
    for data_path, img_path in zip(data_path_list, img_path_list):

        found_data_df = pd.read_csv(data_path, sep="\t")
        page_summary = defaultdict()

        page_summary["processed img"] = img_path
        page_summary["# personal data"] = found_data_df.shape[0]

        tesseract_output = {"left", "top", "width", "height", "conf", "text"}
        personal_data_columns = set(found_data_df.columns) - tesseract_output
        for column in personal_data_columns:
            no_found_data = found_data_df[found_data_df[column] <= max_dist][
                column
            ].shape[0]
            if no_found_data > 0:
                page_summary[column] = no_found_data

        summary_list.append(page_summary)

    pd.DataFrame(summary_list).to_csv(sm_output, index=False, sep="\t")


if __name__ == "__main__":
    summarize_found_personal_data(
        data_path_list=snakemake.input.data,
        img_path_list=snakemake.input.pages,
        sm_output=snakemake.output[0],
        max_dist=snakemake.config["max-distance"],
    )
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
sys.stderr = open(snakemake.log[0], "w")
# parameter = snakemake.params.get("parameter", "")

from collections import defaultdict
from os.path import basename, splitext

import pandas as pd


def summarize_manuel_checks(
    paths_to_manuell_check_files: list[str], path_to_total_summary: str, out_path: str
):
    summary_dict = defaultdict()

    summary_dict["total pages processed"] = pd.read_csv(
        path_to_total_summary, sep="\t"
    ).shape[0]

    for path in paths_to_manuell_check_files:
        header = splitext(basename(path))[0].replace("_", " ").replace("-", " ")
        count = pd.read_csv(path, sep="\t", names=[header]).shape[0]
        summary_dict[header] = count

    manuel_check_summary_df = pd.DataFrame(
        summary_dict.items(), columns=["Check", "Count"]
    )
    manuel_check_summary_df.to_csv(out_path, sep="\t", index=False)


if __name__ == "__main__":
    summarize_manuel_checks(
        snakemake.input.manuel_checks,
        snakemake.input.total_imgs_processed,
        snakemake.output[0],
    )
ShowHide 22 more snippets with no or duplicated tags.

Login to post a comment if you would like to share your experience with this workflow.

Do you know this workflow well? If so, you can request seller status , and start supporting this workflow.

Free

Created: 1yr ago
Updated: 1yr ago
Maitainers: public
URL: https://github.com/thomasbtf/document-anonymization
Name: document-anonymization
Version: latest
Badge:
workflow icon

Insert copied code into your website to add a link to this workflow.

Downloaded: 0
Copyright: Public Domain
License: Apache License 2.0
  • Future updates

Related Workflows

cellranger-snakemake-gke
snakemake workflow to run cellranger on a given bucket using gke.
A Snakemake workflow for running cellranger on a given bucket using Google Kubernetes Engine. The usage of this workflow ...