• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

pulibrary / pymarc_dedupe / c1d8db95-7c95-49d2-adb2-02cb1bbc7196

04 Feb 2025 05:59PM UTC coverage: 99.769% (+0.3%) from 99.479%
c1d8db95-7c95-49d2-adb2-02cb1bbc7196

push

circleci

maxkadel
Finish class and script that run ML algorithm on MarcXML files

Need more testing, documentation, and un-nesting some iterators

57 of 58 new or added lines in 1 file covered. (98.28%)

431 of 432 relevant lines covered (99.77%)

1.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.61
/src/link_records.py
1
import os.path
1✔
2
import csv
1✔
3
import dedupe
1✔
4
from src.link_records_file import LinkRecordsFile
1✔
5

6

7
class LinkRecords:
1✔
8
    def __init__(self, left_file, right_file, output_directory):
1✔
9
        if not os.path.exists(output_directory):
1✔
10
            os.makedirs(output_directory)
1✔
11
        self.left_file = LinkRecordsFile(left_file).csv_path
1✔
12
        self.right_file = LinkRecordsFile(right_file).csv_path
1✔
13
        self.left_data = LinkRecordsFile(left_file).read_data()
1✔
14
        self.right_data = LinkRecordsFile(right_file).read_data()
1✔
15
        self.output_file_path = os.path.join(
1✔
16
            output_directory, "data_matching_output.csv"
17
        )
18
        self.settings_file_path = os.path.join(
1✔
19
            output_directory, "data_matching_learned_settings"
20
        )
21
        self.training_file_path = os.path.join(
1✔
22
            output_directory, "data_matching_training.json"
23
        )
24

25
    def fields(self):
1✔
26
        return [
1✔
27
            dedupe.variables.String("title"),
28
            dedupe.variables.String("author", has_missing=True),
29
            dedupe.variables.String("publication_year"),
30
            dedupe.variables.String("pagination", has_missing=True),
31
            dedupe.variables.Exists("edition"),
32
            dedupe.variables.String("edition", has_missing=True),
33
            dedupe.variables.String("publisher_name", has_missing=True),
34
            dedupe.variables.Exact("type_of"),
35
            dedupe.variables.Exact("is_electronic_resource"),
36
        ]
37

38
    def linker(self):
1✔
39
        try:
1✔
40
            with open(self.settings_file_path, "rb") as sf:
1✔
41
                print("reading from", self.settings_file_path)
1✔
42
                linker = dedupe.StaticRecordLink(sf)
1✔
43
        except FileNotFoundError:
1✔
44
            linker = dedupe.RecordLink(self.fields())
1✔
45
            self.prepare_training(linker)
1✔
46
            self.console_label(linker)
1✔
47
            linker.train()
1✔
48
            # When finished, save our training away to disk
49
            self.write_training(linker)
1✔
50
            self.write_settings(linker)
1✔
51

52
        return linker
1✔
53

54
    def prepare_training(self, linker):
1✔
55
        try:
1✔
56
            with open(self.training_file_path, encoding="utf-8") as tf:
1✔
NEW
57
                return linker.prepare_training(
×
58
                    self.left_data, self.right_data, training_file=tf, sample_size=1500
59
                )
60
        except FileNotFoundError:
1✔
61
            return linker.prepare_training(
1✔
62
                self.left_data, self.right_data, sample_size=1500
63
            )
64

65
    def console_label(self, linker):
1✔
66
        print("starting active labeling...")
1✔
67
        dedupe.console_label(linker)
1✔
68

69
    def cluster(self, linker):
1✔
70
        print("clustering...")
1✔
71
        linked_records = linker.join(
1✔
72
            self.left_data, self.right_data, 0.3, "many-to-many"
73
        )
74
        print("# duplicate sets", len(linked_records))
1✔
75

76
        cluster_membership = {}
1✔
77
        for cluster_id, (cluster, score) in enumerate(linked_records):
1✔
78
            for record_id in cluster:
1✔
79
                cluster_membership[record_id] = {
1✔
80
                    "Cluster ID": cluster_id,
81
                    "Link Score": score,
82
                }
83
        self.write_output(cluster_membership)
1✔
84

85
    def write_training(self, linker):
1✔
86
        with open(self.training_file_path, "w", encoding="utf-8") as tf:
1✔
87
            linker.write_training(tf)
1✔
88

89
    def write_settings(self, linker):
1✔
90
        with open(self.settings_file_path, "wb") as sf:
1✔
91
            linker.write_settings(sf)
1✔
92

93
    def write_output(self, cluster_membership):
1✔
94
        print("Writing duplicates to output file path: " + self.output_file_path)
1✔
95
        with open(self.output_file_path, "w", encoding="utf-8") as f:
1✔
96
            header_unwritten = True
1✔
97

98
            for fileno, filename in enumerate((self.left_file, self.right_file)):
1✔
99
                with open(filename, encoding="utf-8") as f_input:
1✔
100
                    reader = csv.DictReader(f_input)
1✔
101

102
                    if header_unwritten:
1✔
103
                        fieldnames = [
1✔
104
                            "Cluster ID",
105
                            "Link Score",
106
                            "source file",
107
                        ] + reader.fieldnames
108

109
                        writer = csv.DictWriter(f, fieldnames=fieldnames)
1✔
110
                        writer.writeheader()
1✔
111

112
                        header_unwritten = False
1✔
113

114
                    for row_id, row in enumerate(reader):
1✔
115
                        record_id = filename + str(row_id)
1✔
116
                        cluster_details = cluster_membership.get(record_id, {})
1✔
117
                        row["source file"] = fileno
1✔
118
                        row.update(cluster_details)
1✔
119

120
                        writer.writerow(row)
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc