Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def load_test_dataset(self):
api.add_identity(self.db, 'scm', 'jsmith@example.com',
'John Smith', 'jsmith')
api.add_identity(self.db, 'scm', 'jsmith@example.com',
'John Smith')
def test_unify_with_blacklist(self):
"""Test unify method using a blacklist"""
# Add some entries to the blacklist
api.add_to_matching_blacklist(self.db, 'Jane Rae Doe')
api.add_to_matching_blacklist(self.db, 'jsmith@example.com')
before = api.unique_identities(self.db)
self.assertEqual(len(before), 6)
code = self.cmd.unify(matching='default')
self.assertEqual(code, CMD_SUCCESS)
# No match was found
after = api.unique_identities(self.db)
self.assertEqual(len(after), 6)
def load_test_dataset(self):
# Add some unique identities
uuid = api.add_identity(self.db, source='scm', email='jsmith@example.com', name='John Smith')
api.add_identity(self.db, source='scm', name='John Smith', uuid=uuid)
api.add_identity(self.db, source='scm', username='jsmith', uuid=uuid)
uuid = api.add_identity(self.db, source='alt', name='J. Smith', username='john_smith')
api.add_identity(self.db, source='alt', name='John Smith', username='jsmith', uuid=uuid)
api.add_identity(self.db, source='alt', email='jsmith', uuid=uuid)
uuid = api.add_identity(self.db, source='scm', name='Jane Rae')
api.add_identity(self.db, source='mls', email='jane.rae@example.net', name='Jane Rae Doe', uuid=uuid)
uuid = api.add_identity(self.db, source='scm', name='J. Smith', username='john_smith')
api.add_identity(self.db, source='scm', username='john_smith', uuid=uuid)
api.add_identity(self.db, source='mls', username='Smith. J', uuid=uuid)
api.add_identity(self.db, source='mls', email='JSmith@example.com', name='Smith. J', uuid=uuid)
uuid = api.add_identity(self.db, source='mls', email='jrae@example.net', name='Jane Rae Doe')
def test_period_ranges(self):
"""Check whether enrollments cannot be listed giving invalid period ranges"""
self.assertRaisesRegexp(ValueError, ENROLLMENT_PERIOD_INVALID_ERROR,
api.enrollments, self.db, 'John Smith', 'Example',
datetime.datetime(2001, 1, 1),
datetime.datetime(1999, 1, 1))
exc = ENROLLMENT_PERIOD_OUT_OF_BOUNDS_ERROR % {'type' : 'start date',
'date' : '1899-12-31 23:59:59'}
self.assertRaisesRegexp(ValueError, exc,
api.delete_enrollment, self.db, 'John Smith', 'Example',
datetime.datetime(1899, 12, 31, 23, 59, 59))
exc = ENROLLMENT_PERIOD_OUT_OF_BOUNDS_ERROR % {'type' : 'start date',
'date' : '2100-01-01 00:00:01'}
self.assertRaisesRegexp(ValueError, exc,
api.delete_enrollment, self.db, 'John Smith', 'Example',
datetime.datetime(2100, 1, 1, 0, 0, 1))
exc = ENROLLMENT_PERIOD_OUT_OF_BOUNDS_ERROR % {'type' : 'end date',
def test_delete_domains(self):
"""Check whether it deletes a set of domains"""
# First, add a set of organizations, including some domains
api.add_organization(self.db, 'Example')
api.add_domain(self.db, 'Example', 'example.com')
api.add_domain(self.db, 'Example', 'example.org')
api.add_organization(self.db, 'Bitergia')
api.add_domain(self.db, 'Bitergia', 'bitergia.com')
api.add_organization(self.db, 'LibreSoft')
# Delete some domains
api.delete_domain(self.db, 'Example', 'example.org')
api.delete_domain(self.db, 'Bitergia', 'bitergia.com')
with self.db.connect() as session:
doms1 = session.query(Domain).join(Organization).\
filter(Organization.name == 'Example').all()
self.assertEqual(len(doms1), 1)
self.assertEqual(doms1[0].domain, 'example.com')
api.add_organization(self.db, 'Example')
api.add_enrollment(self.db, 'John Smith', 'Example')
api.add_enrollment(self.db, 'John Doe', 'Example')
api.add_organization(self.db, 'Bitergia')
api.add_enrollment(self.db, 'John Smith', 'Bitergia')
api.add_enrollment(self.db, 'John Smith', 'Bitergia',
datetime.datetime(1999, 1, 1),
datetime.datetime(2000, 1, 1))
api.add_organization(self.db, 'LibreSoft')
api.add_enrollment(self.db, 'John Doe', 'LibreSoft')
api.add_enrollment(self.db, 'Jane Rae', 'LibreSoft')
# Delete some enrollments
api.delete_enrollment(self.db, 'John Doe', 'LibreSoft')
api.delete_enrollment(self.db, 'John Doe', 'Example')
with self.db.connect() as session:
enrollments = session.query(Enrollment).join(Organization).\
filter(Organization.name == 'LibreSoft').all()
self.assertEqual(len(enrollments), 1)
self.assertEqual(enrollments[0].uidentity.uuid, 'Jane Rae')
enrollments = session.query(Enrollment).join(Organization).\
filter(Organization.name == 'Example').all()
self.assertEqual(len(enrollments), 1)
self.assertEqual(enrollments[0].uidentity.uuid, 'John Smith')
# Delete enrollments from Bitergia
api.delete_enrollment(self.db, 'John Smith', 'Bitergia')
with self.db.connect() as session:
if not matching:
matching = 'default'
strict = not no_strict_matching
self.recovery = recovery
try:
blacklist = api.blacklist(self.db)
matcher = create_identity_matcher(matching, blacklist,
sources, strict)
except MatcherNotSupportedError as e:
self.error(str(e))
return e.code
uidentities = api.unique_identities(self.db)
try:
self.__unify_unique_identities(uidentities, matcher,
fast_matching, interactive)
self.__display_stats()
except MatcherNotSupportedError as e:
self.error(str(e))
return e.code
except Exception as e:
self.__display_stats()
raise RuntimeError(str(e))
return CMD_SUCCESS
def registry(self, term=None):
"""List organizations and domains.
When no term is given, the method will list the organizations
existing in the registry. If 'term' is set, the method will list
only those organizations and domains that match with that term.
:param term: term to match
"""
try:
orgs = api.registry(self.db, term)
self.display('organizations.tmpl', organizations=orgs)
except NotFoundError as e:
self.error(str(e))
return e.code
return CMD_SUCCESS
"""
uidentities = {}
uids = api.unique_identities(self.db, source=source)
for uid in uids:
enrollments = [rol.to_dict()
for rol in api.enrollments(self.db, uuid=uid.uuid)]
u = uid.to_dict()
u['identities'].sort(key=lambda x: x['id'])
uidentities[uid.uuid] = u
uidentities[uid.uuid]['enrollments'] = enrollments
blacklist = [mb.excluded for mb in api.blacklist(self.db)]
obj = {'time': str(datetime.datetime.now()),
'source': source,
'blacklist': blacklist,
'organizations': {},
'uidentities': uidentities}
return json.dumps(obj, default=self._json_encoder,
indent=4, separators=(',', ': '),
sort_keys=True)
def add(self, entry):
"""Add entries to the blacklist.
This method adds the given 'entry' to the blacklist.
:param entry: entry to add to the blacklist
"""
# Empty or None values for organizations are not allowed
if not entry:
return CMD_SUCCESS
try:
api.add_to_matching_blacklist(self.db, entry)
except InvalidValueError as e:
# If the code reaches here, something really wrong has happened
# because entry cannot be None or empty
raise RuntimeError(str(e))
except AlreadyExistsError as e:
msg = "%s already exists in the registry" % entry
self.error(msg)
return e.code
return CMD_SUCCESS