Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_full_set(self):
block_val = predicates.wholeSetPredicate(self.s1)
self.assertEqual(block_val, (str(self.s1),))
def test_set(self):
s1 = predicates.SimplePredicate(predicates.wholeSetPredicate,
'foo')
colors = set(['red', 'blue', 'green'])
assert s1({'foo': colors}) == (str(colors),)
# ## Active learning
# Starts the training loop. Dedupe will find the next pair of records
# it is least certain about and ask you to label them as duplicates
# or not.
# use 'y', 'n' and 'u' keys to flag duplicates
# press 'f' when you are finished
print 'starting active labeling...'
deduper.train(data_sample, dedupe.training.consoleLabel)
# When finished, save our training away to disk
deduper.writeTraining(r_training_file)
# ## Blocking
deduper.blocker_types.update({'Custom': (dedupe.predicates.wholeSetPredicate,
dedupe.predicates.commonSetElementPredicate),
'LatLong' : (dedupe.predicates.latLongGridPredicate,)
}
)
time_start = time.time()
print 'blocking...'
# Initialize our blocker, which determines our field weights and blocking
# predicates based on our training data
#blocker = deduper.blockingFunction(r_ppc, r_uncovered_dupes)
blocker, ppc_final, ucd_final = patent_util.blockingSettingsWrapper(r_ppc,
r_uncovered_dupes,
deduper
)
if not blocker:
print 'No valid blocking settings found'
class LatLongType(FieldType) :
type = "LatLong"
_predicate_functions = [predicates.latLongGridPredicate]
@staticmethod
def comparator(field_1, field_2) :
if field_1 == (0.0,0.0) or field_2 == (0.0,0.0) :
return numpy.nan
else :
return haversine(field_1, field_2)
class SetType(FieldType) :
type = "Set"
_predicate_functions = (dedupe.predicates.wholeSetPredicate,
dedupe.predicates.commonSetElementPredicate,
dedupe.predicates.lastSetElementPredicate,
dedupe.predicates.commonTwoElementsPredicate,
dedupe.predicates.commonThreeElementsPredicate,
dedupe.predicates.firstSetElementPredicate)
_canopy_thresholds = (0.2, 0.4, 0.6, 0.8)
def __init__(self, definition) :
super(SetType, self).__init__(definition)
canopy_predicates = [predicates.TfidfPredicate(threshold,
self.field)
for threshold in self._canopy_thresholds]
self.predicates += canopy_predicates
# ## Active learning
# Starts the training loop. Dedupe will find the next pair of records
# it is least certain about and ask you to label them as duplicates
# or not.
# use 'y', 'n' and 'u' keys to flag duplicates
# press 'f' when you are finished
print 'starting active labeling...'
deduper.train(data_sample, dedupe.training.consoleLabel)
# When finished, save our training away to disk
deduper.writeTraining(r_training_file)
# ## Blocking
deduper.blocker_types.update({'Custom': (dedupe.predicates.wholeSetPredicate,
dedupe.predicates.commonSetElementPredicate),
'LatLong' : (dedupe.predicates.latLongGridPredicate,)
}
)
time_start = time.time()
print 'blocking...'
# Initialize our blocker, which determines our field weights and blocking
# predicates based on our training data
#blocker = deduper.blockingFunction(r_ppc, r_uncovered_dupes)
blocker, ppc_final, ucd_final = patent_util.blockingSettingsWrapper(r_ppc,
r_uncovered_dupes,
deduper
)
if not blocker:
print 'No valid blocking settings found'
# ## Active learning
# Starts the training loop. Dedupe will find the next pair of records
# it is least certain about and ask you to label them as duplicates
# or not.
# use 'y', 'n' and 'u' keys to flag duplicates
# press 'f' when you are finished
print 'starting active labeling...'
deduper.train(data_sample, dedupe.training.consoleLabel)
# When finished, save our training away to disk
deduper.writeTraining(r_training_file)
# ## Blocking
deduper.blocker_types.update({'Custom': (dedupe.predicates.wholeSetPredicate,
dedupe.predicates.commonSetElementPredicate),
'LatLong' : (dedupe.predicates.latLongGridPredicate,)
}
)
time_start = time.time()
print 'blocking...'
# Initialize our blocker, which determines our field weights and blocking
# predicates based on our training data
#blocker = deduper.blockingFunction(r_ppc, r_uncovered_dupes)
blocker, ppc_final, ucd_final = patent_util.blockingSettingsWrapper(r_ppc,
r_uncovered_dupes,
deduper
)
if not blocker:
print 'No valid blocking settings found'
# ## Active learning
# Starts the training loop. Dedupe will find the next pair of records
# it is least certain about and ask you to label them as duplicates
# or not.
# use 'y', 'n' and 'u' keys to flag duplicates
# press 'f' when you are finished
print 'starting active labeling...'
deduper.train(data_sample, dedupe.training.consoleLabel)
# When finished, save our training away to disk
deduper.writeTraining(r_training_file)
# ## Blocking
deduper.blocker_types.update({'Custom': (dedupe.predicates.wholeSetPredicate,
dedupe.predicates.commonSetElementPredicate),
'LatLong' : (dedupe.predicates.latLongGridPredicate,)
}
)
time_start = time.time()
print 'blocking...'
# Initialize our blocker, which determines our field weights and blocking
# predicates based on our training data
#blocker = deduper.blockingFunction(r_ppc, r_uncovered_dupes)
blocker, ppc_final, ucd_final = patent_util.blockingSettingsWrapper(r_ppc,
r_uncovered_dupes,
deduper
)
if not blocker:
print 'No valid blocking settings found'
from .base import FieldType
from dedupe import predicates
from simplecosine.cosine import CosineSetSimilarity
class SetType(FieldType):
type = "Set"
_predicate_functions = (predicates.wholeSetPredicate,
predicates.commonSetElementPredicate,
predicates.lastSetElementPredicate,
predicates.commonTwoElementsPredicate,
predicates.commonThreeElementsPredicate,
predicates.magnitudeOfCardinality,
predicates.firstSetElementPredicate)
_index_predicates = (predicates.TfidfSetSearchPredicate,
predicates.TfidfSetCanopyPredicate)
_index_thresholds = (0.2, 0.4, 0.6, 0.8)
def __init__(self, definition):
super(SetType, self).__init__(definition)
if 'corpus' not in definition:
definition['corpus'] = []
# ## Active learning
# Starts the training loop. Dedupe will find the next pair of records
# it is least certain about and ask you to label them as duplicates
# or not.
# use 'y', 'n' and 'u' keys to flag duplicates
# press 'f' when you are finished
print 'starting active labeling...'
deduper.train(data_sample, dedupe.training.consoleLabel)
# When finished, save our training away to disk
deduper.writeTraining(r_training_file)
# ## Blocking
deduper.blocker_types.update({'Custom': (dedupe.predicates.wholeSetPredicate,
dedupe.predicates.commonSetElementPredicate),
'LatLong' : (dedupe.predicates.latLongGridPredicate,)
}
)
time_start = time.time()
print 'blocking...'
# Initialize our blocker, which determines our field weights and blocking
# predicates based on our training data
#blocker = deduper.blockingFunction(r_ppc, r_uncovered_dupes)
blocker, ppc_final, ucd_final = patent_util.blockingSettingsWrapper(r_ppc,
r_uncovered_dupes,
deduper
)
if not blocker:
print 'No valid blocking settings found'
# ## Active learning
# Starts the training loop. Dedupe will find the next pair of records
# it is least certain about and ask you to label them as duplicates
# or not.
# use 'y', 'n' and 'u' keys to flag duplicates
# press 'f' when you are finished
print 'starting active labeling...'
deduper.train(data_sample, dedupe.training.consoleLabel)
# When finished, save our training away to disk
deduper.writeTraining(r_training_file)
# ## Blocking
deduper.blocker_types.update({'Custom': (dedupe.predicates.wholeSetPredicate,
dedupe.predicates.commonSetElementPredicate),
'LatLong' : (dedupe.predicates.latLongGridPredicate,)
}
)
time_start = time.time()
print 'blocking...'
# Initialize our blocker, which determines our field weights and blocking
# predicates based on our training data
#blocker = deduper.blockingFunction(r_ppc, r_uncovered_dupes)
blocker, ppc_final, ucd_final = patent_util.blockingSettingsWrapper(r_ppc,
r_uncovered_dupes,
deduper
)
if not blocker:
print 'No valid blocking settings found'