Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def canonicalImport(filename):
preProcess = exampleIO.preProcess
data_d = {}
with open(filename) as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader):
clean_row = [(k, preProcess(v)) for (k, v) in
viewitems(row)]
data_d[filename + str(i)] = dedupe.core.frozendict(clean_row)
return data_d, reader.fieldnames
def test_hash_is_order_insensitive(self):
frozendict = dedupe.core.frozendict
test_dict = {'smtp': 21, 'dict': 2628}
reverse_test_dict = {'dict': 2628, 'smtp': 21}
assert test_dict == reverse_test_dict
test_frozendict = frozendict(test_dict)
reverse_test_frozendict = frozendict(reverse_test_dict)
assert frozendict(test_dict) == frozendict(reverse_test_dict)
assert hash(test_frozendict) == hash(reverse_test_frozendict)
def canonicalImport(filename):
preProcess = exampleIO.preProcess
data_d = {}
with open(filename) as f:
reader = csv.DictReader(f)
for (i, row) in enumerate(reader):
clean_row = [(k, preProcess(v)) for (k, v) in
viewitems(row)]
data_d[i] = dedupe.core.frozendict(clean_row)
return data_d, reader.fieldnames
def freezeData(data) : # pragma: no cover
lfrozendict = frozendict
return [(lfrozendict(record_1),
lfrozendict(record_2))
for record_1, record_2 in data]
def techLocatorImport(filename) :
data_d = {}
duplicates_d = {}
with open(filename) as f :
reader = csv.reader(f, delimiter=',', quotechar='"')
header = reader.next()
for i, row in enumerate(reader) :
instance = {}
for j, col in enumerate(row) :
col = re.sub(' +', ' ', col)
col = re.sub('\n', ' ', col)
instance[header[j]] = col.strip().strip('"').strip("'").lower()
data_d[i] = dedupe.core.frozendict(instance)
return(data_d, header)
"""
Read in our data from a CSV file and create a dictionary of records,
where the key is a unique record ID and each value is a
[frozendict](http://code.activestate.com/recipes/414283-frozen-dictionaries/)
(hashable dictionary) of the row fields.
"""
data_d = {}
for fileno,filename in enumerate(filenames):
with open(filename) as f:
reader = csv.DictReader(f)
for row in reader:
clean_row = [(k, preProcess(v)) for (k, v) in row.items()]
clean_row.append(('dataset',fileno))
row_id = int(row['Id'])
data_d[row_id] = dedupe.core.frozendict(clean_row)
return data_d
Returns a random sample of pairs of donors of a given size from a MySQL table.
Depending on your database engine, you will need to come up with a similar function.
id_column must contain unique, sequential itegers starting at 0 or 1
"""
c.execute("SELECT MAX(%s) FROM %s" , (id_column, table))
num_records = c.fetchone().values()[0]
random_pairs = dedupe.randomPairs(num_records, sample_size, zero_indexed=False)
temp_d = {}
c.execute(donor_select)
for row in c.fetchall() :
temp_d[int(row[id_column])] = dedupe.core.frozendict(row)
def random_pair_generator():
for record_id_1, record_id_2 in random_pairs:
yield ((record_id_1, temp_d[record_id_1]),
(record_id_2, temp_d[record_id_2]))
return tuple(record_pairs for pair in random_pair_generator())