• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pulibrary / pymarc_dedupe / 61497a28-c542-4291-a5f4-fb039d7100c5

26 May 2025 06:06PM UTC coverage: 99.281% (-0.7%) from 100.0%
61497a28-c542-4291-a5f4-fb039d7100c5

Pull #24

circleci

maxkadel
Increase test coverage again
Pull Request #24: Use postgres database version for very large data sets

268 of 274 new or added lines in 10 files covered. (97.81%)

2 existing lines in 1 file now uncovered.

828 of 834 relevant lines covered (99.28%)

0.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.06
/src/db_dedupe_records.py
1
import threading
1✔
2
from os import listdir
1✔
3
from os.path import isfile, join
1✔
4
import time
1✔
5
import dedupe
1✔
6
import psycopg2
1✔
7
import psycopg2.extras
1✔
8
from config import settings
1✔
9
from src.marc_to_db import MarcToDb
1✔
10
from src.machine_learning_model import MachineLearningModel
1✔
11
from src.readable import Readable
1✔
12

13
RECORD_SELECT = """SELECT
1✔
14
                    id, title, author, publication_year, pagination, edition, publisher_name, type_of, is_electronic_resource
15
                    FROM records;
16
                """
17

18

19
class DbDedupeRecords(MachineLearningModel):
1✔
20
    def __init__(self, input_directory, output_directory, match_threshold=0.5):
1✔
21
        MarcToDb.find_or_create_table()
1✔
22
        super().__init__(output_directory, match_threshold)
1✔
23
        directory_list = listdir(input_directory)
1✔
24
        if len(directory_list) == 0:
1✔
25
            raise ValueError(f"Input directory {input_directory} must include files")
1✔
26
        threads = []
1✔
27
        for path in directory_list:
1✔
28
            full_path = join(input_directory, path)
1✔
29
            if isfile(full_path):
1✔
30
                t = threading.Thread(target=ingest_to_db, args=(full_path,))
1✔
31
                threads.append(t)
1✔
32
        self.read_con = psycopg2.connect(
1✔
33
            database=settings.db_name,
34
            user=settings.db_user,
35
            host=settings.db_host,
36
            port=settings.db_port,
37
            cursor_factory=psycopg2.extras.RealDictCursor,
38
        )
39
        self.write_con = psycopg2.connect(
1✔
40
            database=settings.db_name,
41
            user=settings.db_user,
42
            host=settings.db_host,
43
            port=settings.db_port,
44
        )
45
        for t in threads:
1✔
46
            t.start()
1✔
47

48
        for t in threads:
1✔
49
            t.join()
1✔
50

51
    # pylint: disable=duplicate-code
52
    def deduper(self):
1✔
53
        try:
1✔
54
            with open(self.settings_file_path, "rb") as sf:
1✔
55
                print(
1✔
56
                    f"""time: {time.asctime(time.localtime())} -
57
                        reading from {self.settings_file_path}"""
58
                )
59
                model = dedupe.StaticDedupe(sf)
1✔
60
        except FileNotFoundError:
1✔
61
            model = dedupe.Dedupe(self.fields())
1✔
62
            model = self.train_and_write_model(model)
1✔
63

64
        return model
1✔
65

66
    # pylint: enable=duplicate-code
67

68
    def prepare_training(self, model):
1✔
69
        with self.read_con.cursor("record_select") as cur:
1✔
70
            print("Building temporary dictionary of records for training")
1✔
71
            cur.execute(RECORD_SELECT)
1✔
72
            temp_d = dict(enumerate(cur))
1✔
73
        try:
1✔
74
            with open(self.training_file_path, encoding="utf-8") as tf:
1✔
75
                print(
1✔
76
                    f"""time: {time.asctime(time.localtime())} -
77
                    Loading training data from {self.training_file_path}
78
                     - you can skip console label if you would like"""
79
                )
80
                our_model = model.prepare_training(temp_d, training_file=tf)
1✔
81
                print(f"time: {time.asctime(time.localtime())} - training file loaded")
1✔
82
        except FileNotFoundError:
1✔
83
            print(
1✔
84
                f"time: {time.asctime(time.localtime())} - "
85
                "No training file found, preparing training"
86
            )
87
            our_model = model.prepare_training(temp_d)
1✔
88

89
        print(f"time: {time.asctime(time.localtime())} - deleting temp dictionary")
1✔
90
        del temp_d
1✔
91

92
        return our_model
1✔
93

94
    def train_and_write_model(self, model):
1✔
95
        print(f"time: {time.asctime(time.localtime())} - about to prepare training")
1✔
96
        self.prepare_training(model)
1✔
97
        print(f"time: {time.asctime(time.localtime())} - about to console label")
1✔
98
        self.console_label(model)
1✔
99
        model.train()
1✔
100
        # When finished, save our training away to disk
101
        self.write_training(model)
1✔
102
        self.write_settings(model)
1✔
103
        # Remove memory intensive objects used for training
104
        model.cleanup_training()
1✔
105
        return model
1✔
106

107
    def block(self, model):
1✔
108
        print(f"time: {time.asctime(time.localtime())} - blocking...")
1✔
109
        print(f"time: {time.asctime(time.localtime())} - creating blocking_map table")
1✔
110
        with self.write_con:
1✔
111
            with self.write_con.cursor() as cur:
1✔
112
                cur.execute("DROP TABLE IF EXISTS blocking_map")
1✔
113
                cur.execute("CREATE TABLE blocking_map (block_key text, id TEXT)")
1✔
114
        print(f"time: {time.asctime(time.localtime())} - creating inverted index")
1✔
115
        for field in model.fingerprinter.index_fields:
1✔
NEW
116
            with self.read_con.cursor("field_values") as cur:
×
NEW
117
                cur.execute(f"SELECT DISTINCT {field} FROM records")
×
NEW
118
                field_data = (row[field] for row in cur)
×
NEW
119
                model.fingerprinter.index(field_data, field)
×
120

121
        print(f"time: {time.asctime(time.localtime())} - writing blocking map")
1✔
122
        with self.read_con.cursor("record_select") as read_cur:
1✔
123
            read_cur.execute(RECORD_SELECT)
1✔
124

125
            full_data = ((row["id"], row) for row in read_cur)
1✔
126
            b_data = model.fingerprinter(full_data)
1✔
127

128
            with self.write_con:
1✔
129
                with self.write_con.cursor() as write_cur:
1✔
130
                    write_cur.copy_expert(
1✔
131
                        "COPY blocking_map FROM STDIN WITH CSV",
132
                        Readable(b_data),
133
                        size=25000,
134
                    )
135

136
        model.fingerprinter.reset_indices()
1✔
137
        print(f"time: {time.asctime(time.localtime())} - indexing block_key")
1✔
138
        with self.write_con:
1✔
139
            with self.write_con.cursor() as cur:
1✔
140
                cur.execute(
1✔
141
                    "CREATE UNIQUE INDEX ON blocking_map "
142
                    "(block_key text_pattern_ops, id)"
143
                )
144

145
    def cluster(self, model):
1✔
146
        with self.write_con:
1✔
147
            with self.write_con.cursor() as cur:
1✔
148
                cur.execute("DROP TABLE IF EXISTS entity_map")
1✔
149

150
                print(
1✔
151
                    f"time: {time.asctime(time.localtime())} - creating entity_map table"
152
                )
153
                cur.execute(
1✔
154
                    "CREATE TABLE entity_map "
155
                    "(id TEXT, cluster_id TEXT, "
156
                    " cluster_score FLOAT, PRIMARY KEY(id))"
157
                )
158
        with open("pairs.sql", "r", encoding="utf-8") as file:
1✔
159
            pairs_sql = file.read()
1✔
160
        with self.read_con.cursor(
1✔
161
            "pairs", cursor_factory=psycopg2.extensions.cursor
162
        ) as read_cur:
163
            read_cur.execute(pairs_sql)
1✔
164
            print(f"time: {time.asctime(time.localtime())} - clustering...")
1✔
165
            clustered_dupes = model.cluster(
1✔
166
                model.score(self.record_pairs(read_cur)), threshold=self.match_threshold
167
            )
168
            print(
1✔
169
                f"time: {time.asctime(time.localtime())} - writing results to database"
170
            )
171
            # this is very slow on large data sets and only across two CPUs
172
            # Is there a way to multi-thread here?
173
            # Also using a ton of swap memory
174
            # Does not seem to be writing to database in chunks?
175
            # Even though I think it's writing to stdin in chunks?
176
            with self.write_con:
1✔
177
                with self.write_con.cursor() as write_cur:
1✔
178
                    write_cur.copy_expert(
1✔
179
                        "COPY entity_map FROM STDIN WITH CSV",
180
                        Readable(cluster_ids(clustered_dupes)),
181
                        size=10000,
182
                    )
183
        print(f"time: {time.asctime(time.localtime())} - adding index to entity_map")
1✔
184
        with self.write_con:
1✔
185
            with self.write_con.cursor() as cur:
1✔
186
                cur.execute("CREATE INDEX head_index ON entity_map (cluster_id)")
1✔
187

188
    def record_pairs(self, result_set):
1✔
189
        for i, row in enumerate(result_set):
1✔
190
            a_record_id, a_record, b_record_id, b_record = row
1✔
191
            record_a = (a_record_id, a_record)
1✔
192
            record_b = (b_record_id, b_record)
1✔
193

194
            yield record_a, record_b
1✔
195

196
            if i % 10000 == 0:
1✔
197
                print(i)
1✔
198

199
    def create_table_for_csv(self):
1✔
200
        print(
1✔
201
            f"time: {time.asctime(time.localtime())} - creating table for output to csv"
202
        )
203
        with self.read_con.cursor() as cur:
1✔
204
            cur.execute("""CREATE TEMPORARY TABLE for_csv
1✔
205
                        AS SELECT cluster_id, entity_map.id, cluster_score, title, publication_year, pagination, edition, publisher_name, type_of, is_electronic_resource, source_file
206
                        FROM entity_map 
207
                        INNER JOIN records 
208
                        ON entity_map.id = records.id 
209
                        ORDER BY cluster_id, cluster_score;
210
""")
211

212
    def write_to_csv(self):
1✔
213
        print(f"time: {time.asctime(time.localtime())} - writing results to csv")
1✔
214
        with self.read_con.cursor() as cur:
1✔
215
            with open(self.output_file_path, "w", encoding="utf-8") as file:
1✔
216
                cur.copy_expert("COPY for_csv TO STDOUT WITH CSV HEADER", file)
1✔
217

218

219
def cluster_ids(clustered_dupes):
1✔
220
    for cluster, scores in clustered_dupes:
1✔
221
        cluster_id = cluster[0]
1✔
222
        for record_id, score in zip(cluster, scores):
1✔
223
            yield record_id, cluster_id, score
1✔
224

225

226
def ingest_to_db(full_path):
1✔
227
    MarcToDb(full_path).to_db()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc