Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_random_pair_match(self) :
self.assertRaises(ValueError, dedupe.core.randomPairsMatch, 1, 0, 10)
self.assertRaises(ValueError, dedupe.core.randomPairsMatch, 0, 0, 10)
self.assertRaises(ValueError, dedupe.core.randomPairsMatch, 0, 1, 10)
assert len(dedupe.core.randomPairsMatch(100, 100, 100)) == 100
assert len(dedupe.core.randomPairsMatch(10, 10, 99)) == 99
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
pairs = dedupe.core.randomPairsMatch(10, 10, 200)
assert str(w[0].message) == "Requested sample of size 200, only returning 100 possible pairs"
assert len(pairs) == 100
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
pairs = dedupe.core.randomPairsMatch(10, 10, 200)
assert str(w[0].message) == "Requested sample of size 200, only returning 100 possible pairs"
random.seed(123)
numpy.random.seed(123)
pairs = dedupe.core.randomPairsMatch(10, 10, 10)
assert pairs == set([(7, 3), (3, 3), (2, 9), (6, 0), (2, 0),
(1, 9), (9, 4), (0, 4), (1, 0), (1, 1)])
def test_random_pair_match(self) :
self.assertRaises(ValueError, dedupe.core.randomPairsMatch, 1, 0, 10)
self.assertRaises(ValueError, dedupe.core.randomPairsMatch, 0, 0, 10)
self.assertRaises(ValueError, dedupe.core.randomPairsMatch, 0, 1, 10)
assert len(dedupe.core.randomPairsMatch(100, 100, 100)) == 100
assert len(dedupe.core.randomPairsMatch(10, 10, 99)) == 99
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
pairs = dedupe.core.randomPairsMatch(10, 10, 200)
assert str(w[0].message) == "Requested sample of size 200, only returning 100 possible pairs"
assert len(pairs) == 100
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
pairs = dedupe.core.randomPairsMatch(10, 10, 200)
assert str(w[0].message) == "Requested sample of size 200, only returning 100 possible pairs"
def test_random_pair_match(self):
assert len(list(dedupe.core.randomPairsMatch(100, 100, 100))) == 100
assert len(list(dedupe.core.randomPairsMatch(10, 10, 99))) == 99
random.seed(123)
random.seed(123)
if sys.version_info < (3, 0):
target = [(0, 5), (0, 8), (4, 0), (1, 0), (9, 0),
(0, 3), (5, 3), (3, 3), (8, 5), (1, 5)]
else:
target = [(0, 6), (3, 4), (1, 1), (9, 8), (5, 2),
(1, 3), (0, 4), (4, 8), (6, 8), (7, 1)]
pairs = list(dedupe.core.randomPairsMatch(10, 10, 10))
assert pairs == target
pairs = list(dedupe.core.randomPairsMatch(10, 10, 0))
assert pairs == []
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
pairs = dedupe.core.randomPairsMatch(10, 10, 200)
assert str(w[0].message) == "Requested sample of size 200, only returning 100 possible pairs"
assert len(pairs) == 100
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
pairs = dedupe.core.randomPairsMatch(10, 10, 200)
assert str(w[0].message) == "Requested sample of size 200, only returning 100 possible pairs"
random.seed(123)
numpy.random.seed(123)
pairs = dedupe.core.randomPairsMatch(10, 10, 10)
assert pairs == set([(7, 3), (3, 3), (2, 9), (6, 0), (2, 0),
(1, 9), (9, 4), (0, 4), (1, 0), (1, 1)])
def test_random_pair_match(self) :
self.assertRaises(ValueError, dedupe.core.randomPairsMatch, 1, 0, 10)
self.assertRaises(ValueError, dedupe.core.randomPairsMatch, 0, 0, 10)
self.assertRaises(ValueError, dedupe.core.randomPairsMatch, 0, 1, 10)
assert len(dedupe.core.randomPairsMatch(100, 100, 100)) == 100
assert len(dedupe.core.randomPairsMatch(10, 10, 99)) == 99
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
pairs = dedupe.core.randomPairsMatch(10, 10, 200)
assert str(w[0].message) == "Requested sample of size 200, only returning 100 possible pairs"
assert len(pairs) == 100
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
pairs = dedupe.core.randomPairsMatch(10, 10, 200)
assert str(w[0].message) == "Requested sample of size 200, only returning 100 possible pairs"
def test_random_pair_match(self):
assert len(list(dedupe.core.randomPairsMatch(100, 100, 100))) == 100
assert len(list(dedupe.core.randomPairsMatch(10, 10, 99))) == 99
random.seed(123)
random.seed(123)
if sys.version_info < (3, 0):
target = [(0, 5), (0, 8), (4, 0), (1, 0), (9, 0),
(0, 3), (5, 3), (3, 3), (8, 5), (1, 5)]
else:
target = [(0, 6), (3, 4), (1, 1), (9, 8), (5, 2),
(1, 3), (0, 4), (4, 8), (6, 8), (7, 1)]
pairs = list(dedupe.core.randomPairsMatch(10, 10, 10))
assert pairs == target
pairs = list(dedupe.core.randomPairsMatch(10, 10, 0))
assert pairs == []
def sample(self, data_1, data_2, blocked_proportion, sample_size):
offset = len(data_1)
blocked_sample_size = int(blocked_proportion * sample_size)
predicates = list(self.data_model.predicates(index_predicates=False))
deque_1 = sampling.randomDeque(data_1)
deque_2 = sampling.randomDeque(data_2)
blocked_sample_keys = sampling.linkBlockedSample(blocked_sample_size,
predicates,
deque_1,
deque_2)
random_sample_size = sample_size - len(blocked_sample_keys)
random_sample_keys = core.randomPairsMatch(len(deque_1),
len(deque_2),
random_sample_size)
random_sample_keys = {(a, b + offset)
for a, b in random_sample_keys}
return [(data_1[k1], data_2[k2])
for k1, k2
in blocked_sample_keys | random_sample_keys]
data_2 = core.index(data_2, offset)
self.sampled_records_2 = Sample(data_2, 500)
blocked_sample_size = int(blocked_proportion * sample_size)
predicates = list(self.data_model.predicates(index_predicates=False))
deque_1 = sampling.randomDeque(data_1)
deque_2 = sampling.randomDeque(data_2)
blocked_sample_keys = sampling.linkBlockedSample(blocked_sample_size,
predicates,
deque_1,
deque_2)
random_sample_size = sample_size - len(blocked_sample_keys)
random_sample_keys = core.randomPairsMatch(len(deque_1),
len(deque_2),
random_sample_size)
random_sample_keys = {(a, b + offset)
for a, b in random_sample_keys}
data_sample = ((data_1[k1], data_2[k2])
for k1, k2
in blocked_sample_keys | random_sample_keys)
data_sample = core.freezeData(data_sample)
self._loadSample(data_sample)