Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import dedupe
import unittest
import random
import numpy
import warnings
from collections import OrderedDict
DATA_SAMPLE = ((dedupe.core.frozendict({'age': '27', 'name': 'Kyle'}),
dedupe.core.frozendict({'age': '50', 'name': 'Bob'})),
(dedupe.core.frozendict({'age': '27', 'name': 'Kyle'}),
dedupe.core.frozendict({'age': '35', 'name': 'William'})),
(dedupe.core.frozendict({'age': '10', 'name': 'Sue'}),
dedupe.core.frozendict({'age': '35', 'name': 'William'})),
(dedupe.core.frozendict({'age': '27', 'name': 'Kyle'}),
dedupe.core.frozendict({'age': '20', 'name': 'Jimmy'})),
(dedupe.core.frozendict({'age': '75', 'name': 'Charlie'}),
dedupe.core.frozendict({'age': '21', 'name': 'Jimbo'})))
data_dict = OrderedDict(((0, {'name' : 'Bob', 'age' : '51'}),
(1, {'name' : 'Linda', 'age' : '50'}),
(2, {'name' : 'Gene', 'age' : '12'}),
(3, {'name' : 'Tina', 'age' : '15'}),
(4, {'name' : 'Bob B.', 'age' : '51'}),
(5, {'name' : 'bob belcher', 'age' : '51'}),
ipc_codes = [i for i in ipc_codes if len(i.strip()) > 0]
else:
ipc_codes = str(dfrow['ipc_sector']).split('.')[0]
if isinstance(dfrow['name'], str):
name = dfrow['name']
else:
name = ''
# row_out['Coauthor_Count'] = len(coauthors)
# row_out['Class_Count'] = len(classes)
row_out['LatLong'] = (float(dfrow['lat']), float(dfrow['lng']))
row_out['name'] = name
row_out['ipc_sector'] = ipc_codes
row_out['dbase'] = row_type
row_tuple = [(k, v) for (k, v) in row_out.items()]
data_d[idx] = dedupe.core.frozendict(row_tuple)
return data_d
def __init__(self,
data_model,
data,
blocked_proportion,
sample_size,
original_length,
index_include):
self.data_model = data_model
data = core.index(data)
self.candidates = super().sample(data, blocked_proportion, sample_size)
self.blocker = DedupeBlockLearner(data_model,
self.candidates,
data,
original_length,
index_include)
self._common_init()
def sample(self, data, blocked_proportion, sample_size):
blocked_sample_size = int(blocked_proportion * sample_size)
predicates = list(self.data_model.predicates(index_predicates=False))
data = sampling.randomDeque(data)
blocked_sample_keys = sampling.dedupeBlockedSample(blocked_sample_size,
predicates,
data)
random_sample_size = sample_size - len(blocked_sample_keys)
random_sample_keys = set(core.randomPairs(len(data),
random_sample_size))
data = dict(data)
return [(data[k1], data[k2])
for k1, k2
in blocked_sample_keys | random_sample_keys]
blocks -- Sequence of tuples of records, where each tuple is a
set of records covered by a blocking predicate
threshold -- Number between 0 and 1 (default is .5). We will
only consider as duplicates record pairs as
duplicates if their estimated duplicate
likelihood is greater than the threshold.
Lowering the number will increase recall,
raising it will increase precision
"""
candidate_records = self._blockedPairs(blocks)
matches = core.scoreDuplicates(candidate_records,
self.data_model,
self.classifier,
self.num_cores,
threshold)
logger.debug("matching done, begin clustering")
clusters = self._cluster(matches, threshold, *args, **kwargs)
try:
match_file = matches.filename
del matches
os.remove(match_file)
except AttributeError:
pass
blocks -- Sequence of tuples of records, where each tuple is a
set of records covered by a blocking predicate
threshold -- Number between 0 and 1 (default is .5). We will
only consider as duplicates record pairs as
duplicates if their estimated duplicate
likelihood is greater than the threshold.
Lowering the number will increase recall,
raising it will increase precision
"""
candidate_records = itertools.chain.from_iterable(self._blockedPairs(blocks))
matches = core.scoreDuplicates(candidate_records,
self.data_model,
self.classifier,
self.num_cores,
threshold=0)
logger.debug("matching done, begin clustering")
for cluster in self._cluster(matches, threshold, *args, **kwargs):
yield cluster
try:
match_file = matches.filename
del matches
os.remove(match_file)
except AttributeError:
pass
def coveredPairs(blocker, records):
cover = {}
pair_enumerator = core.Enumerator()
n_records = len(records)
for predicate in blocker.predicates:
pred_cover = collections.defaultdict(set)
for id, record in viewitems(records):
blocks = predicate(record)
for block in blocks:
pred_cover[block].add(id)
if not pred_cover:
continue
max_cover = max(len(v) for v in pred_cover.values())
if max_cover == n_records:
continue
data = sampling.randomDeque(data)
blocked_sample_keys = sampling.dedupeBlockedSample(blocked_sample_size,
predicates,
data)
random_sample_size = sample_size - len(blocked_sample_keys)
random_sample_keys = set(core.randomPairs(len(data),
random_sample_size))
data = dict(data)
data_sample = [(data[k1], data[k2])
for k1, k2
in blocked_sample_keys | random_sample_keys]
data_sample = core.freezeData(data_sample)
self._loadSample(data_sample)
blocks -- Sequence of tuples of records, where each tuple is a
set of records covered by a blocking predicate
threshold -- Number between 0 and 1 (default is .5). We will
only consider as duplicates record pairs as
duplicates if their estimated duplicate
likelihood is greater than the threshold.
Lowering the number will increase recall,
raising it will increase precision
"""
candidate_records = self._blockedPairs(blocks)
matches = core.scoreGazette(candidate_records,
self.data_model,
self.classifier,
self.num_cores,
threshold=threshold)
logger.debug("matching done, begin clustering")
return self._cluster(matches, *args, **kwargs)
def _blockedPairs(self, blocks):
"""
Generate tuples of pairs of records from a block of records
Arguments:
blocks -- an iterable sequence of blocked records
"""
block, blocks = core.peek(blocks)
self._checkBlock(block)
product = itertools.product
pairs = (product(base, target) for base, target in blocks)
return itertools.chain.from_iterable(pairs)