Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_random_pair(self) :
self.assertRaises(ValueError, dedupe.core.randomPairs, 1, 10)
assert dedupe.core.randomPairs(10, 10)
random.seed(123)
numpy.random.seed(123)
random_pairs = dedupe.core.randomPairs(10, 5)
assert random_pairs == [( 0, 3),
( 3, 8),
( 4, 9),
( 5, 9),
( 2, 3)]
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
dedupe.core.randomPairs(10, 10**6)
assert len(w) == 1
assert str(w[-1].message) == "Requested sample of size 1000000, only returning 45 possible pairs"
if sys.version_info < (3, 0):
target = [(0, 3), (0, 4), (2, 4), (0, 5), (6, 8)]
else:
target = [(0, 4), (2, 3), (0, 6), (3, 6), (0, 7)]
random_pairs = list(dedupe.core.randomPairs(10, 5))
assert random_pairs == target
random.seed(123)
if sys.version_info < (3, 0):
target = [(265, 3429)]
else:
target = [(357, 8322)]
random_pairs = list(dedupe.core.randomPairs(10**4, 1))
assert random_pairs == target
random_pairs = list(dedupe.core.randomPairs(10**10, 1))
def test_random_pair(self):
random.seed(123)
if sys.version_info < (3, 0):
target = [(0, 3), (0, 4), (2, 4), (0, 5), (6, 8)]
else:
target = [(0, 4), (2, 3), (0, 6), (3, 6), (0, 7)]
random_pairs = list(dedupe.core.randomPairs(10, 5))
assert random_pairs == target
random.seed(123)
if sys.version_info < (3, 0):
target = [(265, 3429)]
else:
target = [(357, 8322)]
random_pairs = list(dedupe.core.randomPairs(10**4, 1))
assert random_pairs == target
random_pairs = list(dedupe.core.randomPairs(10**10, 1))
assert random_pairs == [( 0, 3),
( 3, 8),
( 4, 9),
( 5, 9),
( 2, 3)]
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
dedupe.core.randomPairs(10, 10**6)
assert len(w) == 1
assert str(w[-1].message) == "Requested sample of size 1000000, only returning 45 possible pairs"
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
sample = dedupe.core.randomPairs(10**40, 10)
assert len(w) == 2
assert str(w[0].message) == "There may be duplicates in the sample"
assert "Asked to sample pairs from" in str(w[1].message)
set(sample)
random.seed(123)
numpy.random.seed(123)
assert numpy.array_equal(dedupe.core.randomPairs(10**3, 1),
numpy.array([(292, 413)]))
def test_random_pair(self) :
self.assertRaises(ValueError, dedupe.core.randomPairs, 1, 10)
assert dedupe.core.randomPairs(10, 10)
random.seed(123)
numpy.random.seed(123)
random_pairs = dedupe.core.randomPairs(10, 5)
assert random_pairs == [( 0, 3),
( 3, 8),
( 4, 9),
( 5, 9),
( 2, 3)]
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
dedupe.core.randomPairs(10, 10**6)
assert len(w) == 1
assert str(w[-1].message) == "Requested sample of size 1000000, only returning 45 possible pairs"
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
sample = dedupe.core.randomPairs(10**40, 10)
def within_sampler(d, N):
min_d = np.min(d)
idx_pairs = dedupe.core.randomPairs(len(d), N)
out = [(a + min_d, b + min_d) for a, b in idx_pairs]
return out
sample_size -- Size of the sample to draw
blocked_proportion -- Proportion of the sample that will be blocked
'''
data = core.index(data)
self.sampled_records = Sample(data, 900)
blocked_sample_size = int(blocked_proportion * sample_size)
predicates = list(self.data_model.predicates(index_predicates=False))
data = sampling.randomDeque(data)
blocked_sample_keys = sampling.dedupeBlockedSample(blocked_sample_size,
predicates,
data)
random_sample_size = sample_size - len(blocked_sample_keys)
random_sample_keys = set(core.randomPairs(len(data),
random_sample_size))
data = dict(data)
data_sample = [(data[k1], data[k2])
for k1, k2
in blocked_sample_keys | random_sample_keys]
data_sample = core.freezeData(data_sample)
self._loadSample(data_sample)
unique_record_ids = set()
# a list of record_ids associated with each common_key
for record_id, record in data.items():
unique_record_ids.add(record_id)
identified_records[record[common_key]].append(record_id)
# all combinations of matched_pairs from each common_key group
for record_ids in identified_records.values():
if len(record_ids) > 1:
matched_pairs.update(itertools.combinations(sorted(record_ids), 2))
# calculate indices using dedupe.core.randomPairs to avoid
# the memory cost of enumerating all possible pairs
unique_record_ids = list(unique_record_ids)
pair_indices = randomPairs(len(unique_record_ids), training_size)
distinct_pairs = set()
for i, j in pair_indices:
distinct_pairs.add((unique_record_ids[i],
unique_record_ids[j]))
distinct_pairs -= matched_pairs
matched_records = [(data[key_1], data[key_2])
for key_1, key_2 in matched_pairs]
distinct_records = [(data[key_1], data[key_2])
for key_1, key_2 in distinct_pairs]
training_pairs = {'match': matched_records,
'distinct': distinct_records}