• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pulibrary / pymarc_dedupe / 4f06df0c-5da3-4707-bade-63cea0efc8d5

19 May 2025 03:37PM UTC coverage: 99.595% (-0.4%) from 100.0%
4f06df0c-5da3-4707-bade-63cea0efc8d5

Pull #24

circleci

maxkadel
Try to increase test coverage
Pull Request #24: Green locally - connect to Postgres DB

158 of 161 new or added lines in 5 files covered. (98.14%)

737 of 740 relevant lines covered (99.59%)

1.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.14
/src/db_dedupe_records.py
1
from os import listdir
1✔
2
from os.path import isfile, join
1✔
3
import dedupe
1✔
4
import psycopg2
1✔
5
import psycopg2.extras
1✔
6
from config import settings
1✔
7
from src.marc_to_db import MarcToDb
1✔
8
from src.machine_learning_model import MachineLearningModel
1✔
9
from src.readable import Readable
1✔
10

11
RECORD_SELECT = """SELECT
1✔
12
                    id, title, author, publication_year, pagination, edition, publisher_name, type_of, is_electronic_resource
13
                    FROM records;
14
                """
15

16

17
class DbDedupeRecords(MachineLearningModel):
1✔
18
    def __init__(self, input_directory, output_directory, match_threshold=0.5):
1✔
19
        super().__init__(output_directory, match_threshold)
1✔
20
        for path in listdir(input_directory):
1✔
21
            full_path = join(input_directory, path)
1✔
22
            if isfile(full_path):
1✔
23
                # save to database
24
                MarcToDb(full_path).to_db()
1✔
25
        self.read_con = psycopg2.connect(
1✔
26
            database=settings.db_name,
27
            user=settings.db_user,
28
            host=settings.db_host,
29
            port=settings.db_port,
30
            cursor_factory=psycopg2.extras.RealDictCursor,
31
        )
32
        self.write_con = psycopg2.connect(
1✔
33
            database=settings.db_name,
34
            user=settings.db_user,
35
            host=settings.db_host,
36
            port=settings.db_port,
37
        )
38

39
    def deduper(self):
1✔
40
        try:
1✔
41
            with open(self.settings_file_path, "rb") as sf:
1✔
NEW
42
                print("reading from", self.settings_file_path)
×
NEW
43
                model = dedupe.StaticDedupe(sf)
×
44
        except FileNotFoundError:
1✔
45
            model = dedupe.Dedupe(self.fields())
1✔
46
            model = self.train_and_write_model(model)
1✔
47

48
        return model
1✔
49

50
    def prepare_training(self, model):
1✔
51
        with self.read_con.cursor("donor_select") as cur:
1✔
52
            cur.execute(RECORD_SELECT)
1✔
53
            # temp_d = {i: row for i, row in enumerate(cur)}
54
            temp_d = dict(enumerate(cur))
1✔
55
        try:
1✔
56
            with open(self.training_file_path, encoding="utf-8") as tf:
1✔
57
                print(f'Loading training data from {self.training_file_path} - you can skip console label if you would like')
1✔
58
                return model.prepare_training(temp_d, training_file=tf)
1✔
59
        except FileNotFoundError:
1✔
60
            return model.prepare_training(temp_d)
1✔
61

NEW
62
        del temp_d
×
63

64
    def train_and_write_model(self, model):
1✔
65
        self.prepare_training(model)
1✔
66
        self.console_label(model)
1✔
67
        model.train()
1✔
68
        # When finished, save our training away to disk
69
        self.write_training(model)
1✔
70
        self.write_settings(model)
1✔
71
        # Remove memory intensive objects used for training
72
        model.cleanup_training()
1✔
73
        return model
1✔
74

75
    def block(self, model):
1✔
76
        print("blocking...")
1✔
77
        print("creating blocking_map table")
1✔
78
        with self.write_con:
1✔
79
            with self.write_con.cursor() as cur:
1✔
80
                cur.execute("DROP TABLE IF EXISTS blocking_map")
1✔
81
                cur.execute("CREATE TABLE blocking_map (block_key text, id TEXT)")
1✔
82
        print("creating inverted index")
1✔
83
        for field in model.fingerprinter.index_fields:
1✔
84
            with self.read_con.cursor("field_values") as cur:
1✔
85
                cur.execute(f"SELECT DISTINCT {field} FROM records")
1✔
86
                field_data = (row[field] for row in cur)
1✔
87
                model.fingerprinter.index(field_data, field)
1✔
88

89
        print("writing blocking map")
1✔
90
        with self.read_con.cursor("donor_select") as read_cur:
1✔
91
            read_cur.execute(RECORD_SELECT)
1✔
92

93
            full_data = ((row["id"], row) for row in read_cur)
1✔
94
            b_data = model.fingerprinter(full_data)
1✔
95

96
            with self.write_con:
1✔
97
                with self.write_con.cursor() as write_cur:
1✔
98
                    write_cur.copy_expert(
1✔
99
                        "COPY blocking_map FROM STDIN WITH CSV",
100
                        Readable(b_data),
101
                        size=10000,
102
                    )
103

104
        model.fingerprinter.reset_indices()
1✔
105
        print("indexing block_key")
1✔
106
        with self.write_con:
1✔
107
            with self.write_con.cursor() as cur:
1✔
108
                cur.execute(
1✔
109
                    "CREATE UNIQUE INDEX ON blocking_map "
110
                    "(block_key text_pattern_ops, id)"
111
                )
112

113
    def cluster(self, model):
1✔
114
        with self.write_con:
1✔
115
            with self.write_con.cursor() as cur:
1✔
116
                cur.execute("DROP TABLE IF EXISTS entity_map")
1✔
117

118
                print("creating entity_map database")
1✔
119
                cur.execute(
1✔
120
                    "CREATE TABLE entity_map "
121
                    "(id TEXT, canon_id TEXT, "
122
                    " cluster_score FLOAT, PRIMARY KEY(id))"
123
                )
124
        with open("pairs.sql", "r", encoding="utf-8") as file:
1✔
125
            pairs_sql = file.read()
1✔
126
        with self.read_con.cursor(
1✔
127
            "pairs", cursor_factory=psycopg2.extensions.cursor
128
        ) as read_cur:
129
            read_cur.execute(pairs_sql)
1✔
130
            print("clustering...")
1✔
131
            clustered_dupes = model.cluster(
1✔
132
                model.score(self.record_pairs(read_cur)), threshold=self.match_threshold
133
            )
134
            print("writing results to database")
1✔
135
            with self.write_con:
1✔
136
                with self.write_con.cursor() as write_cur:
1✔
137
                    write_cur.copy_expert(
1✔
138
                        "COPY entity_map FROM STDIN WITH CSV",
139
                        Readable(cluster_ids(clustered_dupes)),
140
                        size=10000,
141
                    )
142
        with self.write_con:
1✔
143
            with self.write_con.cursor() as cur:
1✔
144
                cur.execute("CREATE INDEX head_index ON entity_map (canon_id)")
1✔
145

146
    def record_pairs(self, result_set):
1✔
147
        for i, row in enumerate(result_set):
1✔
148
            a_record_id, a_record, b_record_id, b_record = row
1✔
149
            record_a = (a_record_id, a_record)
1✔
150
            record_b = (b_record_id, b_record)
1✔
151

152
            yield record_a, record_b
1✔
153

154
            if i % 10000 == 0:
1✔
155
                print(i)
1✔
156

157

158
def cluster_ids(clustered_dupes):
1✔
159
    for cluster, scores in clustered_dupes:
1✔
160
        cluster_id = cluster[0]
1✔
161
        for donor_id, score in zip(cluster, scores):
1✔
162
            yield donor_id, cluster_id, score
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc